未验证 提交 298f210d 编写于 作者: S ShenLiang 提交者: GitHub

Support control flow in DataParallel (#32826)

* fix find_unused_parameters default value
上级 a9e53050
......@@ -172,7 +172,7 @@ message DistributedStrategy {
optional bool fp16_allreduce = 25 [ default = false ];
optional bool sharding = 26 [ default = false ];
optional float last_comm_group_size_MB = 27 [ default = 1 ];
optional bool find_unused_parameters = 28 [ default = true ];
optional bool find_unused_parameters = 28 [ default = false ];
optional bool tensor_parallel = 29 [ default = false ];
optional bool without_graph_optimization = 30 [ default = false ];
......
......@@ -297,7 +297,7 @@ Reducer::Reducer(const std::vector<std::shared_ptr<imperative::VarBase>> &vars,
is_sparse_gradient_(is_sparse_gradient),
parallel_ctx_(parallel_ctx),
group_size_limits_(group_size_limits),
find_unused_vars_(find_unused_vars) {
find_unused_vars_each_step_(find_unused_vars) {
VLOG(3) << "Start construct the Reducer ...";
nrings_ = parallel_ctx->GetNRings();
nranks_ = parallel_ctx->GetNRanks();
......@@ -457,42 +457,8 @@ void Reducer::PrepareDeps(const std::unordered_set<GradOpNode *> &init_nodes) {
}
}
// After each batch is calculated, the counter of each group(group.pending_)
// and allreudce sequence counter(next_group_) will be cleaned up again.
void Reducer::PrepareForBackward(
void Reducer::TraverseBackwardGraph(
const std::vector<std::shared_ptr<imperative::VarBase>> &outputs) {
VLOG(3) << "after forward, then reset count for backward.";
next_group_ = 0;
std::for_each(groups_.begin(), groups_.end(), [](Group &group) {
group.pending_ = group.variable_indices_.size();
group.sparse_contents_ = nullptr;
});
// reinitialize vars_marked_ready_ for next iteration
vars_marked_ready_.clear();
vars_marked_ready_.resize(vars_.size(), false);
PADDLE_ENFORCE_EQ(
groups_need_finalize_, false,
platform::errors::PreconditionNotMet(
"A serious error has occurred here. There may be several reasons: "
"1) Please note that all forward outputs derived from the module "
"parameters must participate in the calculation of losses and "
"subsequent gradient calculations. If not, the wrapper will hang, "
"waiting for autograd to generate gradients for these parameters. "
"you can use detach or stop_gradient to make the unused parameters "
"detached from the autograd graph. "
"2) Used multiple forwards and one backward. You may be able to wrap "
"multiple forwards in a model."));
// The first var to trigger the unused parameter
has_marked_unused_vars_ = false;
unused_vars_.clear();
if (!find_unused_vars_) {
return;
}
node_deps_.clear();
std::queue<std::shared_ptr<GradOpNode>> q;
std::unordered_set<VariableWrapper *> var_visited;
......@@ -554,8 +520,50 @@ void Reducer::PrepareForBackward(
<< "] is not used";
}
}
}
if (unused_vars_.empty()) {
// After each batch is calculated, the counter of each group(group.pending_)
// and allreudce sequence counter(next_group_) will be cleaned up again.
void Reducer::PrepareForBackward(
const std::vector<std::shared_ptr<imperative::VarBase>> &outputs) {
VLOG(3) << "after forward, then reset count for backward.";
next_group_ = 0;
std::for_each(groups_.begin(), groups_.end(), [](Group &group) {
group.pending_ = group.variable_indices_.size();
group.sparse_contents_ = nullptr;
});
// reinitialize vars_marked_ready_ for next iteration
vars_marked_ready_.clear();
vars_marked_ready_.resize(vars_.size(), false);
PADDLE_ENFORCE_EQ(
groups_need_finalize_, false,
platform::errors::PreconditionNotMet(
"A serious error has occurred here. Please "
"set find_unused_parameters=True to traverse backward graph "
"in each step to prepare reduce in advance. If you have "
"set, There may be several reasons for this error: "
"1) Please note that all forward outputs derived from the module "
"parameters must participate in the calculation of losses and "
"subsequent gradient calculations. If not, the wrapper will hang, "
"waiting for autograd to generate gradients for these parameters. "
"you can use detach or stop_gradient to make the unused parameters "
"detached from the autograd graph. "
"2) Used multiple forwards and one backward. You may be able to wrap "
"multiple forwards in a model."));
// The first var to trigger the unused parameter
has_marked_unused_vars_ = false;
if (find_unused_vars_once_ || find_unused_vars_each_step_) {
unused_vars_.clear();
TraverseBackwardGraph(outputs);
// only check once in first step
find_unused_vars_once_ = false;
}
if (find_unused_vars_each_step_ && unused_vars_.empty()) {
LOG_FIRST_N(WARNING, 1)
<< "All parameters are involved in the backward pass. "
"It is recommended to set find_unused_parameters to False "
......@@ -564,7 +572,9 @@ void Reducer::PrepareForBackward(
"will occur. Please make it clear that in the subsequent "
"training, there will be no parameters that are not used "
"in the backward pass, and then set find_unused_parameters";
} else if (unused_vars_.size() == vars_.size()) {
}
if (unused_vars_.size() == vars_.size()) {
LOG_FIRST_N(WARNING, 1)
<< "There is no parameter in the device involved "
"in the backward calculation. If there are "
......@@ -595,13 +605,13 @@ void Reducer::AddDistHook(size_t var_index) {
local_used_vars_[var_index] = 1;
// rebuild group when find_unused_vars_ is false
// rebuild group when find_unused_vars_each_step_ is false
if (NeedRebuildGroup()) {
rebuild_vars_.push_back(vars_[var_index]);
rebuild_var_indices_.push_back(var_index);
}
if (!has_marked_unused_vars_ && find_unused_vars_) {
if (!has_marked_unused_vars_) {
has_marked_unused_vars_ = true;
for (const auto &unused_index : unused_vars_) {
MarkVarReady(unused_index, false);
......@@ -622,7 +632,9 @@ void Reducer::MarkVarReady(const size_t var_index, const bool is_used_var) {
if (vars_marked_ready_[var_index]) {
auto error_info = string::Sprintf(
"Error happened, when parameter[%d][%s] has been ready before. "
"There may be several reasons for this error: "
"Please set find_unused_parameters=True to traverse backward graph "
"in each step to prepare reduce in advance. If you have set, "
"there may be several reasons for this error: "
"1) In multiple reentrant backward phase, some parameters are reused."
"2) Using model parameters outside of forward function. Please "
"make sure that model parameters are not shared in concurrent "
......@@ -690,10 +702,16 @@ void Reducer::MarkVarReady(const size_t var_index, const bool is_used_var) {
}
} else {
// process sparse group
PADDLE_ENFORCE_EQ(HasGrad(var_index), true,
platform::errors::PreconditionNotMet(
"The sparse parameter[%d][%s] must have a gradient",
var_index, vars_[var_index]->Name()));
PADDLE_ENFORCE_EQ(
HasGrad(var_index), true,
platform::errors::PreconditionNotMet(
"The sparse parameter[%d][%s] should have gradient. "
"Currently, DataParallel does not support sparse "
"parameters without generating gradients during training. "
"For example, if is_sparese=True is used in Embedding, "
"the current step of this parameter cannot generate gradient "
"because of stop_gradient/detatch, where error will occur.",
var_index, vars_[var_index]->Name()));
auto var_base = vars_[var_index]->GradVarBase();
// need to check tensor type
PADDLE_ENFORCE_EQ(
......@@ -943,7 +961,7 @@ void Reducer::FinalizeBackward() {
InitializeGroups(group_indices_);
}
if (find_unused_vars_) {
if (find_unused_vars_each_step_) {
// TODO(liuyuhui) support xpu about Tensorcopy/TensorFromVector/TensorToVector
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
ProcessUnusedDenseVars();
......
......@@ -162,13 +162,16 @@ class Reducer {
std::vector<std::vector<size_t>> RebuildGruops();
inline bool NeedRebuildGroup() {
return !has_rebuilt_group_ && !find_unused_vars_;
return !has_rebuilt_group_ && !find_unused_vars_each_step_;
}
void ProcessUnusedDenseVars();
bool HasGrad(size_t var_index);
void TraverseBackwardGraph(
const std::vector<std::shared_ptr<imperative::VarBase>>& outputs);
private:
std::vector<std::shared_ptr<imperative::VarBase>> vars_;
std::vector<std::vector<size_t>> group_indices_;
......@@ -195,7 +198,8 @@ class Reducer {
std::unordered_map<VariableWrapper*, size_t> var_index_map_;
std::vector<size_t> unused_vars_;
bool has_marked_unused_vars_{false};
bool find_unused_vars_{false};
bool find_unused_vars_each_step_{false};
bool find_unused_vars_once_{true};
bool groups_need_finalize_{false};
#ifdef PADDLE_WITH_XPU_BKCL
// comm_pool_ is used for scheduling allreduce in multi Kunlun cards training.
......
......@@ -626,7 +626,7 @@ class DistributedStrategy(object):
Indicating whether we are using find_unused_parameters to
find unused parameters in DataParallel.
Default value: True
Default value: False
Examples:
......
......@@ -417,14 +417,15 @@ class DataParallel(layers.Layer):
Note that setting the find_unused_parameters to True
will affect computing performance. Therefore, if all parameters
are sure to participate in the loss calculation and the
autograd graph construction, please set it False. Default: True.
autograd graph construction, please set it False. Default: False.
Returns:
Layer: The data paralleled module.
Examples:
.. code-block:: python
# required: distributed
import paddle
import paddle.nn as nn
import paddle.optimizer as opt
......@@ -474,7 +475,7 @@ class DataParallel(layers.Layer):
strategy=None,
comm_buffer_size=25,
last_comm_buffer_size=1,
find_unused_parameters=True):
find_unused_parameters=False):
super(DataParallel,
self).__init__(layers.full_name() + "_data_parallel")
......@@ -576,12 +577,8 @@ class DataParallel(layers.Layer):
def forward(self, *inputs, **kwargs):
outputs = self._layers(*inputs, **kwargs)
if self._strategy.nranks > 1 and framework._dygraph_tracer()._has_grad:
if self.find_unused_parameters:
self._reducer.prepare_for_backward(
list(self._find_varbase(outputs)))
else:
self._reducer.prepare_for_backward(list(self._find_varbase([])))
self._reducer.prepare_for_backward(
list(self._find_varbase(outputs)))
return outputs
@deprecated(
......
......@@ -74,8 +74,8 @@ class TestDistTraning(unittest.TestCase):
state_dict = model_a.state_dict()
model_b.set_state_dict(state_dict)
model_a = paddle.DataParallel(model_a)
model_b = paddle.DataParallel(model_b)
model_a = paddle.DataParallel(model_a, find_unused_parameters=True)
model_b = paddle.DataParallel(model_b, find_unused_parameters=True)
ones_input = paddle.ones(shape=(batch, in_dim))
ones_input.stop_gradient = True
......
......@@ -27,6 +27,7 @@ from test_dist_base import RUN_STEP
class SpawnAssistTestArgs(object):
update_method = "local"
trainer_id = 0
find_unused_parameters = False
class TestDistSpawnRunner(unittest.TestCase):
......
......@@ -548,7 +548,10 @@ class TestParallelDyGraphRunnerBase(object):
# 4. train model
model, train_reader, opt = self.get_model()
if args.update_method == "nccl2":
model = paddle.DataParallel(model)
if args.find_unused_parameters:
model = paddle.DataParallel(model, find_unused_parameters=True)
else:
model = paddle.DataParallel(model, find_unused_parameters=False)
out_losses = []
for step_id, data in enumerate(train_reader()):
......@@ -581,8 +584,8 @@ class TestParallelDyGraphRunnerBase(object):
# set strategy
strategy = fleet.DistributedStrategy()
if not args.find_unused_parameters:
strategy.find_unused_parameters = False
if args.find_unused_parameters:
strategy.find_unused_parameters = True
# 3. init parallel env
if args.update_method == "nccl2" or "bkcl":
......@@ -737,7 +740,7 @@ class TestDistBase(unittest.TestCase):
self._save_model = False
self._fuse_all_reduce = None
self._accumulate_gradient = False
self._find_unused_parameters = True
self._find_unused_parameters = False
self._setup_config()
global DIST_UT_PORT
......
......@@ -30,6 +30,7 @@ class TestDygraphControlFlowSame(TestDistBase):
self._sync_mode = False
self._nccl2_mode = True
self._dygraph = True
self._find_unused_parameters = True
def test_net(self):
if fluid.core.is_compiled_with_cuda():
......@@ -46,6 +47,7 @@ class TestFleetDygraphControlFlowSame(TestDygraphControlFlowSame):
self._nccl2_mode = True
self._dygraph = True
self._use_fleet_api = True
self._find_unused_parameters = True
class TestFleetDygraphControlFlowSameAccGrad(TestDygraphControlFlowSame):
......@@ -54,6 +56,7 @@ class TestFleetDygraphControlFlowSameAccGrad(TestDygraphControlFlowSame):
self._nccl2_mode = True
self._dygraph = True
self._accumulate_gradient = True
self._find_unused_parameters = True
class TestDygraphControlFlowDiff(TestDistBase):
......@@ -61,6 +64,7 @@ class TestDygraphControlFlowDiff(TestDistBase):
self._sync_mode = False
self._nccl2_mode = True
self._dygraph = True
self._find_unused_parameters = True
def test_net(self):
if fluid.core.is_compiled_with_cuda():
......@@ -77,6 +81,7 @@ class TestFleetDygraphControlFlowDiff(TestDygraphControlFlowDiff):
self._nccl2_mode = True
self._dygraph = True
self._use_fleet_api = True
self._find_unused_parameters = True
class TestFleetDygraphControlFlowDiffAccGrad(TestDygraphControlFlowDiff):
......@@ -85,6 +90,7 @@ class TestFleetDygraphControlFlowDiffAccGrad(TestDygraphControlFlowDiff):
self._nccl2_mode = True
self._dygraph = True
self._accumulate_gradient = True
self._find_unused_parameters = True
if __name__ == "__main__":
......
......@@ -31,6 +31,7 @@ class TestParallelDygraphMnist(TestDistBase):
self._sync_mode = False
self._nccl2_mode = True
self._dygraph = True
self._find_unused_parameters = True
def test_mnist(self):
if fluid.core.is_compiled_with_cuda():
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册