未验证 提交 8ed33bf9 编写于 作者: C chengduo 提交者: GitHub

Fix Bug-prone code of PE (#18354)

* update pe reduce config
test=develop

*  drop the local_exe_scopes of the previous parallel_executor
test=develop
上级 999d9a59
...@@ -311,8 +311,8 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places, ...@@ -311,8 +311,8 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places,
member_->global_scope_ = scope; member_->global_scope_ = scope;
member_->use_cuda_ = exec_strategy.use_cuda_; member_->use_cuda_ = exec_strategy.use_cuda_;
member_->build_strategy_ = build_strategy; member_->build_strategy_ = build_strategy;
member_->use_all_reduce_ = member_->use_all_reduce_ = member_->build_strategy_.reduce_ ==
build_strategy.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce; BuildStrategy::ReduceStrategy::kAllReduce;
member_->nranks_ = build_strategy.num_trainers_ * places.size(); member_->nranks_ = build_strategy.num_trainers_ * places.size();
if (!member_->use_all_reduce_ && member_->nranks_ == 1) { if (!member_->use_all_reduce_ && member_->nranks_ == 1) {
LOG(INFO) << "If you set build_strategy.reduce with 'Reduce'," LOG(INFO) << "If you set build_strategy.reduce with 'Reduce',"
...@@ -350,7 +350,7 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places, ...@@ -350,7 +350,7 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places,
} }
std::vector<ir::Graph *> graphs; std::vector<ir::Graph *> graphs;
if (build_strategy.async_mode_) { if (member_->build_strategy_.async_mode_) {
PADDLE_ENFORCE(!member_->use_cuda_, PADDLE_ENFORCE(!member_->use_cuda_,
"gpu mode does not support async_mode_ now!"); "gpu mode does not support async_mode_ now!");
graphs.push_back(graph); graphs.push_back(graph);
...@@ -364,9 +364,10 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places, ...@@ -364,9 +364,10 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places,
// FIXME(Yancey1989): parallel graph mode get better performance // FIXME(Yancey1989): parallel graph mode get better performance
// in GPU allreduce distributed training. Need an elegant way to // in GPU allreduce distributed training. Need an elegant way to
// choice the execution strategy. // choice the execution strategy.
build_strategy.enable_parallel_graph_ = member_->build_strategy_.enable_parallel_graph_ =
EnableParallelGraphExecution(*graph, exec_strategy, build_strategy); EnableParallelGraphExecution(*graph, exec_strategy,
if (build_strategy.enable_parallel_graph_) { member_->build_strategy_);
if (member_->build_strategy_.enable_parallel_graph_) {
LOG(INFO) << "The Executor would execute the graph by ParallelGraph " LOG(INFO) << "The Executor would execute the graph by ParallelGraph "
"Execution which can get better performance," "Execution which can get better performance,"
<< "you can force it off by env FLAGS_enable_parallel_graph=0"; << "you can force it off by env FLAGS_enable_parallel_graph=0";
...@@ -374,7 +375,7 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places, ...@@ -374,7 +375,7 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places,
if (member_->use_cuda_ && member_->nranks_ > 1) { if (member_->use_cuda_ && member_->nranks_ > 1) {
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
member_->InitOrGetNCCLCommunicator(scope, build_strategy); member_->InitOrGetNCCLCommunicator(scope, member_->build_strategy_);
// Initialize device context's nccl comm, will be used by normal // Initialize device context's nccl comm, will be used by normal
// Operators like sync_batch_norm, and collective ops. // Operators like sync_batch_norm, and collective ops.
...@@ -397,7 +398,7 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places, ...@@ -397,7 +398,7 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places,
} }
// broadcast parameters from the 0th device to others: // broadcast parameters from the 0th device to others:
auto need_broadcast = [&]() -> bool { auto need_broadcast = [&]() -> bool {
if (build_strategy.num_trainers_ > 1) { if (member_->build_strategy_.num_trainers_ > 1) {
// 1. num_tariners would be grater than 1 for nccl distributed training. // 1. num_tariners would be grater than 1 for nccl distributed training.
return true; return true;
} else if (member_->local_scopes_.size() != 1 && local_scopes.empty()) { } else if (member_->local_scopes_.size() != 1 && local_scopes.empty()) {
...@@ -409,7 +410,7 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places, ...@@ -409,7 +410,7 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places,
}; };
// Bcast Parameters to all GPUs // Bcast Parameters to all GPUs
if (need_broadcast()) { if (need_broadcast()) {
BCastParamsToDevices(bcast_vars, build_strategy.trainer_id_); BCastParamsToDevices(bcast_vars, member_->build_strategy_.trainer_id_);
} }
// Startup Program has been run. All local scopes has correct parameters. // Startup Program has been run. All local scopes has correct parameters.
...@@ -418,39 +419,40 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places, ...@@ -418,39 +419,40 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places,
// ncclOp // ncclOp
std::vector<ir::Graph *> async_graphs(places.size()); std::vector<ir::Graph *> async_graphs(places.size());
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
if (build_strategy.async_mode_) { if (member_->build_strategy_.async_mode_) {
VLOG(3) << "use local async mode"; VLOG(3) << "use local async mode";
graph = build_strategy.Apply(graph, {member_->places_[0]}, loss_var_name, graph = member_->build_strategy_.Apply(
{member_->local_scopes_[0]}, 1, graph, {member_->places_[0]}, loss_var_name,
member_->use_cuda_, member_->nccl_ctxs_); {member_->local_scopes_[0]}, 1, member_->use_cuda_,
member_->nccl_ctxs_);
for (size_t i = 1; i < member_->places_.size(); ++i) { for (size_t i = 1; i < member_->places_.size(); ++i) {
graphs[i] = graphs[i] = member_->build_strategy_.Apply(
build_strategy.Apply(graphs[i], {member_->places_[i]}, loss_var_name, graphs[i], {member_->places_[i]}, loss_var_name,
{member_->local_scopes_[i]}, 1, {member_->local_scopes_[i]}, 1, member_->use_cuda_,
member_->use_cuda_, member_->nccl_ctxs_); member_->nccl_ctxs_);
async_graphs[i] = graphs[i]; async_graphs[i] = graphs[i];
} }
} else { } else {
graph = build_strategy.Apply(graph, member_->places_, loss_var_name, graph = member_->build_strategy_.Apply(
member_->local_scopes_, member_->nranks_, graph, member_->places_, loss_var_name, member_->local_scopes_,
member_->use_cuda_, member_->nccl_ctxs_); member_->nranks_, member_->use_cuda_, member_->nccl_ctxs_);
} }
#else #else
if (build_strategy.async_mode_) { if (member_->build_strategy_.async_mode_) {
VLOG(3) << "use local async mode"; VLOG(3) << "use local async mode";
graph = build_strategy.Apply(graph, {member_->places_[0]}, loss_var_name, graph = member_->build_strategy_.Apply(
{member_->local_scopes_[0]}, 1, graph, {member_->places_[0]}, loss_var_name,
member_->use_cuda_); {member_->local_scopes_[0]}, 1, member_->use_cuda_);
for (size_t i = 1; i < member_->places_.size(); ++i) { for (size_t i = 1; i < member_->places_.size(); ++i) {
graphs[i] = build_strategy.Apply( graphs[i] = member_->build_strategy_.Apply(
graphs[i], {member_->places_[i]}, loss_var_name, graphs[i], {member_->places_[i]}, loss_var_name,
{member_->local_scopes_[i]}, 1, member_->use_cuda_); {member_->local_scopes_[i]}, 1, member_->use_cuda_);
async_graphs[i] = graphs[i]; async_graphs[i] = graphs[i];
} }
} else { } else {
graph = build_strategy.Apply(graph, member_->places_, loss_var_name, graph = member_->build_strategy_.Apply(
member_->local_scopes_, member_->nranks_, graph, member_->places_, loss_var_name, member_->local_scopes_,
member_->use_cuda_); member_->nranks_, member_->use_cuda_);
} }
#endif #endif
...@@ -491,11 +493,11 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places, ...@@ -491,11 +493,11 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places,
} }
} }
if (build_strategy.async_mode_) { if (member_->build_strategy_.async_mode_) {
VLOG(3) << "use AsyncSSAGraphExecutor"; VLOG(3) << "use AsyncSSAGraphExecutor";
member_->executor_.reset(new details::AsyncSSAGraphExecutor( member_->executor_.reset(new details::AsyncSSAGraphExecutor(
exec_strategy, member_->local_scopes_, member_->places_, async_graphs)); exec_strategy, member_->local_scopes_, member_->places_, async_graphs));
} else if (build_strategy.enable_parallel_graph_) { } else if (member_->build_strategy_.enable_parallel_graph_) {
VLOG(3) << "use ParallelSSAGraphExecutor"; VLOG(3) << "use ParallelSSAGraphExecutor";
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
// TODO(Yancey1989): Remove passing in the main_program when // TODO(Yancey1989): Remove passing in the main_program when
...@@ -519,7 +521,7 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places, ...@@ -519,7 +521,7 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places,
} }
VLOG(3) << "use ScopeBufferedSSAGraphExecutor"; VLOG(3) << "use ScopeBufferedSSAGraphExecutor";
if (!build_strategy.async_mode_) { if (!member_->build_strategy_.async_mode_) {
member_->executor_.reset(new details::ScopeBufferedSSAGraphExecutor( member_->executor_.reset(new details::ScopeBufferedSSAGraphExecutor(
exec_strategy, member_->local_scopes_, std::move(var_infos), exec_strategy, member_->local_scopes_, std::move(var_infos),
member_->places_, std::move(member_->executor_))); member_->places_, std::move(member_->executor_)));
......
...@@ -275,6 +275,8 @@ class CompiledProgram(object): ...@@ -275,6 +275,8 @@ class CompiledProgram(object):
"share_vars_from is not compiled and run, so there is no " "share_vars_from is not compiled and run, so there is no "
"var to share.") "var to share.")
self._local_scopes = self._share_vars_from._executor.local_scopes() self._local_scopes = self._share_vars_from._executor.local_scopes()
# drop the local_exe_scopes of the previous parallel_executor
self._share_vars_from._executor.drop_local_exe_scopes()
else: else:
assert scope is not None, "" assert scope is not None, ""
self._local_scopes = [] self._local_scopes = []
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册