未验证 提交 faa8c703 编写于 作者: A Aurelius84 提交者: GitHub

Polish ParallelExectuor constructor into small functions (#32191)

* Refine Constructor logic of ParallelExecutor

* refine function name

* refine code comment
上级 39a59dcf
......@@ -27,7 +27,6 @@ limitations under the License. */
#include "paddle/fluid/framework/details/multi_devices_helper.h"
#include "paddle/fluid/framework/details/op_handle_base.h"
#include "paddle/fluid/framework/details/parallel_ssa_graph_executor.h"
#include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h"
#include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h"
#include "paddle/fluid/framework/ir/graph.h"
#include "paddle/fluid/framework/ir/graph_helper.h"
......@@ -631,141 +630,15 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places,
InitP2P(places);
ir::InitReaderQueueDeviceCount(graph, *(member_->global_scope_),
member_->places_.size());
member_->use_device_ = exec_strategy.use_device_;
member_->build_strategy_ = build_strategy;
member_->use_all_reduce_ = member_->build_strategy_.reduce_ ==
BuildStrategy::ReduceStrategy::kAllReduce;
member_->nranks_ = build_strategy.num_trainers_ * places.size();
if (!member_->use_all_reduce_ && member_->nranks_ == 1) {
LOG(INFO) << "If you set build_strategy.reduce with 'Reduce',"
"the number of places should be greater than 1.";
member_->build_strategy_.reduce_ =
BuildStrategy::ReduceStrategy::kAllReduce;
member_->use_all_reduce_ = true;
}
#if (defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)) && defined(_WIN32)
if (member_->IsUseCUDA(member_->use_device_)) {
PADDLE_ENFORCE_EQ(
places.size(), 1,
platform::errors::Unavailable("Windows can support Single GPU only."));
}
#endif
#if (defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)) && \
(!defined(PADDLE_WITH_NCCL) && !defined(PADDLE_WITH_RCCL))
if (member_->IsUseCUDA(member_->use_device_)) {
PADDLE_ENFORCE_EQ(
places.size(), 1,
platform::errors::PermissionDenied(
"Your machine has multiple cards, "
"but the WITH_NCCL option is not turned on during compilation, "
"and you cannot use multi-card training or prediction. "
"Please recompile and turn on the WITH_NCCL option."));
}
#endif
std::string device_name;
if (member_->use_device_ == p::kCPU) {
device_name = "CPU";
} else if (member_->use_device_ == p::kCUDA) {
device_name = "CUDA";
} else {
device_name = "XPU";
}
VLOG(1) << string::Sprintf(
"The Program will be executed on %s using ParallelExecutor, %lu "
"cards are used, so %lu programs are executed in parallel.",
device_name, places.size(), places.size());
// Step 1. Bcast the bcast_vars to devs.
// Create local scopes
if (local_scopes.empty()) {
member_->own_local_scope_ = true;
member_->local_scopes_.emplace_back(member_->global_scope_);
for (size_t i = 1; i < member_->places_.size(); ++i) {
member_->local_scopes_.emplace_back(&scope->NewScope());
}
} else {
member_->own_local_scope_ = false;
PADDLE_ENFORCE_EQ(member_->places_.size(), local_scopes.size(),
platform::errors::PreconditionNotMet(
"member_->places_.size() = %d is not equal to "
"local_scopes.size() = %d",
member_->places_.size(), local_scopes.size()));
for (size_t i = 0; i < member_->places_.size(); ++i) {
member_->local_scopes_.emplace_back(&local_scopes[i]->NewScope());
}
}
std::vector<ir::Graph *> graphs;
if (member_->build_strategy_.async_mode_) {
PADDLE_ENFORCE_EQ(member_->IsUseCUDA(member_->use_device_), false,
platform::errors::Unavailable(
"gpu mode does not support async_mode_ now!"));
graphs.push_back(graph);
for (size_t i = 1; i < places.size(); ++i) {
auto *tmp_graph = new ir::Graph(graph->OriginProgram());
async_graphs_.emplace_back(tmp_graph);
graphs.push_back(tmp_graph);
}
}
// FIXME(Yancey1989): parallel graph mode get better performance
// in GPU allreduce distributed training. Need an elegant way to
// choice the execution strategy.
member_->build_strategy_.enable_parallel_graph_ =
EnableParallelGraphExecution(*graph, exec_strategy,
member_->build_strategy_);
if (member_->build_strategy_.enable_parallel_graph_) {
LOG(INFO) << "The Executor would execute the graph by ParallelGraph "
"Execution which can get better performance,"
<< "you can force it off by env FLAGS_enable_parallel_graph=0";
}
if (member_->IsUseCUDA(member_->use_device_) && member_->nranks_ > 1) {
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
member_->InitOrGetNCCLCommunicator(scope, &member_->build_strategy_);
// Initialize necessary info of member_ with strategy.
InitExecutorPrivateMemberInfo(exec_strategy, build_strategy, places.size(),
*graph);
// Initialize device context's nccl comm, will be used by normal
// Operators like sync_batch_norm, and collective ops.
// NOTE: more than one ParallelExecutor with same place, the nccl comm will
// be rewrite and there will be some problem.
// NOTE: NCCL group-calls and non-group-calls can not use the same
// NCCL communicator, so for ParallelGraph and Multi-Process mode, re-use
// same communicators.
auto *nccl_ctxs =
member_->nccl_ctxs_->GetSyncBatchNormCtx(scope, member_->places_);
auto &pool = platform::DeviceContextPool::Instance();
for (size_t dev_id = 0; dev_id < member_->places_.size(); ++dev_id) {
auto *dev_ctx = static_cast<platform::CUDADeviceContext *>(
pool.Get(member_->places_[dev_id]));
auto &nccl_ctx = nccl_ctxs->at(member_->places_[dev_id]);
dev_ctx->set_nccl_comm(nccl_ctx.comm());
}
#else
PADDLE_THROW(
platform::errors::PreconditionNotMet("Not compiled with CUDA."));
#endif
}
if (member_->use_device_ == p::kXPU && member_->nranks_ > 1) {
#if defined(PADDLE_WITH_XPU_BKCL)
member_->InitOrGetBKCLCommunicator(scope, member_->build_strategy_);
// Step 1. Create local scopes and Clone graph into multi device
CreateLocalScopes(scope, local_scopes, /*create_new*/ true);
std::vector<ir::Graph *> graphs = CloneGraphToMultiDevices(graph);
PrepareNCCLCommunicator(scope);
auto *bkcl_ctxs =
member_->bkcl_ctxs_->GetSyncBatchNormCtx(scope, member_->places_);
auto &pool = platform::DeviceContextPool::Instance();
for (size_t dev_id = 0; dev_id < member_->places_.size(); ++dev_id) {
auto *dev_ctx = static_cast<platform::XPUDeviceContext *>(
pool.Get(member_->places_[dev_id]));
auto &bkcl_ctx = bkcl_ctxs->at(member_->places_[dev_id]);
dev_ctx->set_bkcl_context(bkcl_ctx.comm());
}
#else
PADDLE_THROW(
platform::errors::PreconditionNotMet("Not compiled with XPU."));
#endif
}
// broadcast parameters from the 0th device to others:
auto need_broadcast = [&]() -> bool {
if (member_->build_strategy_.num_trainers_ > 1) {
......@@ -778,259 +651,77 @@ ParallelExecutor::ParallelExecutor(const std::vector<platform::Place> &places,
}
return false;
};
// Bcast Parameters to all GPUs
if (need_broadcast()) {
BCastParamsToDevices(bcast_vars, member_->build_strategy_.trainer_id_);
}
// Startup Program has been run. All local scopes has correct parameters.
// Step 2. Convert main_program to SSA form and dependency graph. Also, insert
// ncclOp
std::vector<ir::Graph *> async_graphs(places.size());
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
if (member_->build_strategy_.async_mode_) {
VLOG(3) << "use local async mode";
graph = member_->build_strategy_.Apply(
graph, {member_->places_[0]}, loss_var_name,
{member_->local_scopes_[0]}, 1, member_->use_device_,
member_->nccl_ctxs_);
for (size_t i = 1; i < member_->places_.size(); ++i) {
graphs[i] = member_->build_strategy_.Apply(
graphs[i], {member_->places_[i]}, loss_var_name,
{member_->local_scopes_[i]}, 1, member_->use_device_,
member_->nccl_ctxs_);
async_graphs[i] = graphs[i];
}
} else {
graph = member_->build_strategy_.Apply(
graph, member_->places_, loss_var_name, member_->local_scopes_,
member_->nranks_, member_->use_device_, member_->nccl_ctxs_);
}
#elif defined(PADDLE_WITH_XPU_BKCL)
if (member_->build_strategy_.async_mode_) {
VLOG(3) << "use local async mode";
graph = member_->build_strategy_.Apply(
graph, {member_->places_[0]}, loss_var_name,
{member_->local_scopes_[0]}, 1, member_->use_device_,
member_->bkcl_ctxs_);
for (size_t i = 1; i < member_->places_.size(); ++i) {
graphs[i] = member_->build_strategy_.Apply(
graphs[i], {member_->places_[i]}, loss_var_name,
{member_->local_scopes_[i]}, 1, member_->use_device_,
member_->bkcl_ctxs_);
async_graphs[i] = graphs[i];
}
} else {
graph = member_->build_strategy_.Apply(
graph, member_->places_, loss_var_name, member_->local_scopes_,
member_->nranks_, member_->use_device_, member_->bkcl_ctxs_);
}
#else
if (member_->build_strategy_.async_mode_) {
VLOG(3) << "use local async mode";
graph = member_->build_strategy_.Apply(
graph, {member_->places_[0]}, loss_var_name,
{member_->local_scopes_[0]}, 1, member_->use_device_);
for (size_t i = 1; i < member_->places_.size(); ++i) {
graphs[i] = member_->build_strategy_.Apply(
graphs[i], {member_->places_[i]}, loss_var_name,
{member_->local_scopes_[i]}, 1, member_->use_device_);
async_graphs[i] = graphs[i];
}
} else {
graph = member_->build_strategy_.Apply(
graph, member_->places_, loss_var_name, member_->local_scopes_,
member_->nranks_, member_->use_device_);
}
#endif
std::vector<ir::Graph *> async_graphs =
CompileGraphWithBuildStrategy(graph, &graphs, loss_var_name);
graph = member_->ApplyMemoryOptimizePass(graph);
async_graphs[0] = graph;
// Step 3. Create vars in each scope. Passes may also create new vars.
// skip control vars and empty vars
std::vector<details::VariableInfo> var_infos;
for (auto &node : graph->Nodes()) {
if (node->IsVar() && !node->IsCtrlVar() && node->Var()) {
var_infos.emplace_back();
var_infos.back().name_ = node->Var()->Name();
var_infos.back().type_ = node->Var()->GetType();
var_infos.back().persistable_ = node->Var()->Persistable();
CreateVariableInfos(&var_infos, graph);
std::unordered_map<Scope *, Scope *> scope_map =
CreateLocalExecScopes(member_->local_scopes_, /*create_new*/ true);
member_->is_persistable_.emplace(node->Var()->Name(),
node->Var()->Persistable());
}
// Step 4. Create SSAGraph executor
std::vector<ir::Graph *> final_graphs =
CreateSSAGraphExecutor(exec_strategy, &async_graphs, graph);
VLOG(3) << "use ScopeBufferedSSAGraphExecutor";
if (!member_->build_strategy_.async_mode_) {
member_->executor_.reset(new details::ScopeBufferedSSAGraphExecutor(
exec_strategy, member_->local_scopes_, member_->local_exec_scopes_,
std::move(var_infos), member_->places_, std::move(member_->executor_)));
}
if (graph->Has(details::kFusedVars)) {
auto &fused_vars = graph->Get<details::FusedVars>(details::kFusedVars);
for (auto &fused_var : fused_vars) {
var_infos.emplace_back();
var_infos.back() = fused_var.second;
ResetOpHandleScopeMapOfGraphs(final_graphs, scope_map);
SetReaderOpDeviceInfoOfGraphs(final_graphs);
}
member_->is_persistable_.emplace(fused_var.first,
fused_var.second.persistable_);
void ParallelExecutor::BCastParamsToDevices(
const std::vector<std::string> &vars, int trainer_id) const {
VLOG(3) << "BCastParamsToDevices";
// the initializing bcast, all vars would be bcast from device(0).
for (auto &var : vars) {
framework::Variable *main_var = member_->local_scopes_[0]->FindVar(var);
if (main_var == nullptr || !main_var->IsType<LoDTensor>()) {
continue;
}
auto &main_tensor = main_var->Get<LoDTensor>();
if (!main_tensor.IsInitialized()) {
VLOG(3) << "one in var not inited, return!";
continue;
}
auto &dims = main_tensor.dims();
if (paddle::platform::is_gpu_place(main_tensor.place())) {
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
std::vector<void *> buffers;
buffers.reserve(member_->places_.size());
size_t numel = main_tensor.numel();
ncclDataType_t data_type = platform::ToNCCLDataType(main_tensor.type());
for (size_t i = 0; i < member_->places_.size(); ++i) {
auto place = member_->places_[i];
void *buffer;
std::unordered_map<Scope *, Scope *> scope_map;
for (auto *scope : member_->local_scopes_) {
auto &local_exec_scope = scope->NewScope();
member_->local_exec_scopes_.emplace_back(&local_exec_scope);
scope_map.emplace(scope, &local_exec_scope);
if (i == 0 && trainer_id == 0) {
buffer = const_cast<void *>(main_tensor.data<void>());
} else {
auto local_scope = member_->local_scopes_[i];
auto *t = local_scope->Var(var)->GetMutable<LoDTensor>();
t->Resize(dims);
buffer = t->mutable_data(place, main_tensor.type());
}
buffers.push_back(buffer);
}
PADDLE_ENFORCE_EQ(
member_->local_scopes_.size(), member_->local_exec_scopes_.size(),
platform::errors::PreconditionNotMet(
"member_->local_scopes_.size() = %d is not equal to "
"member_->local_exec_scopes_.size() = %d",
member_->local_scopes_.size(), member_->local_exec_scopes_.size()));
std::vector<ir::Graph *> final_graphs;
if (member_->build_strategy_.async_mode_) {
VLOG(3) << "use AsyncSSAGraphExecutor";
member_->executor_.reset(new details::AsyncSSAGraphExecutor(
exec_strategy, member_->local_scopes_, member_->local_exec_scopes_,
member_->places_, async_graphs));
final_graphs = async_graphs;
} else if (member_->build_strategy_.enable_parallel_graph_) {
VLOG(3) << "use ParallelSSAGraphExecutor";
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
// TODO(Yancey1989): Remove passing in the main_program when
// allreduce_seq_pass doesn't need it as the attr.
bool is_inference = details::IsDataParallelInferenceGraph(*graph);
bool has_drop_last_read_op = details::HasDropLastReadOp(*graph);
auto *pg_exe = new details::ParallelSSAGraphExecutor(
exec_strategy, member_->local_scopes_, member_->local_exec_scopes_,
member_->places_, graph);
final_graphs = pg_exe->Graphs();
member_->executor_.reset(pg_exe);
if (is_inference && member_->places_.size() > 1) {
member_->inference_executor_ = pg_exe;
if (!has_drop_last_read_op) {
VLOG(5) << "Enable partial feed support in inference phase";
pg_exe->EnablePartialFeedSupport();
}
}
#else
PADDLE_THROW(platform::errors::PreconditionNotMet(
"Paddle should be compiled with CUDA for ParallelGraph Execution."));
#endif
} else {
bool has_drop_last_read_op = details::HasDropLastReadOp(*graph);
auto possible_inference_graphs =
details::TrySeparateToMultipleSingleDeviceGraphs(graph);
if (!possible_inference_graphs.empty()) {
VLOG(5) << "Use ParallelSSAGraphExecutor in inference phase";
auto *pg_exe = new details::ParallelSSAGraphExecutor(
exec_strategy, member_->local_scopes_, member_->local_exec_scopes_,
member_->places_, std::move(possible_inference_graphs));
if (!has_drop_last_read_op) {
VLOG(5) << "Enable partial feed support in inference phase";
pg_exe->EnablePartialFeedSupport();
}
final_graphs = pg_exe->Graphs();
member_->executor_.reset(pg_exe);
member_->inference_executor_ = pg_exe;
} else {
LOG_IF(WARNING, details::HasKeepLastReadOp(*graph))
<< "drop_last=False for DataLoader is not supported in training "
"network. It is automatically turned to drop_last=True.";
if (exec_strategy.type_ == ExecutionStrategy::kDefault) {
VLOG(3) << "use ThreadedSSAGraphExecutor";
member_->executor_.reset(new details::ThreadedSSAGraphExecutor(
exec_strategy, member_->local_scopes_, member_->local_exec_scopes_,
member_->places_, graph));
} else {
if (member_->use_device_ == p::kXPU) {
#if defined(PADDLE_WITH_XPU)
VLOG(3) << "use BindThreadedSSAGraphExecutor";
member_->executor_.reset(new details::BindThreadedSSAGraphExecutor(
exec_strategy, member_->local_scopes_,
member_->local_exec_scopes_, member_->places_, graph));
#else
PADDLE_THROW(platform::errors::PermissionDenied(
"Paddle can't use XPU device since it's not compiled with XPU,"
"Please recompile or reinstall Paddle with XPU support."));
#endif
} else {
VLOG(3) << "use FastThreadedSSAGraphExecutor";
member_->executor_.reset(new details::FastThreadedSSAGraphExecutor(
exec_strategy, member_->local_scopes_,
member_->local_exec_scopes_, member_->places_, graph));
}
}
final_graphs.emplace_back(graph);
}
}
VLOG(3) << "use ScopeBufferedSSAGraphExecutor";
if (!member_->build_strategy_.async_mode_) {
member_->executor_.reset(new details::ScopeBufferedSSAGraphExecutor(
exec_strategy, member_->local_scopes_, member_->local_exec_scopes_,
std::move(var_infos), member_->places_, std::move(member_->executor_)));
}
for (auto *g : final_graphs) {
auto ops = ir::FilterByNodeWrapper<details::OpHandleBase>(*g);
for (auto *op : ops) {
op->SetLocalExecScopes(scope_map);
}
}
if (final_graphs.size() == 1) {
ir::SetReaderOpDeviceInfo(final_graphs[0], member_->places_.size());
} else {
for (size_t i = 0; i < final_graphs.size(); ++i) {
ir::SetReaderOpDeviceInfo(final_graphs[i], member_->places_.size(), i);
}
}
}
void ParallelExecutor::BCastParamsToDevices(
const std::vector<std::string> &vars, int trainer_id) const {
VLOG(3) << "BCastParamsToDevices";
// the initializing bcast, all vars would be bcast from device(0).
for (auto &var : vars) {
framework::Variable *main_var = member_->local_scopes_[0]->FindVar(var);
if (main_var == nullptr || !main_var->IsType<LoDTensor>()) {
continue;
}
auto &main_tensor = main_var->Get<LoDTensor>();
if (!main_tensor.IsInitialized()) {
VLOG(3) << "one in var not inited, return!";
continue;
}
auto &dims = main_tensor.dims();
if (paddle::platform::is_gpu_place(main_tensor.place())) {
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
std::vector<void *> buffers;
buffers.reserve(member_->places_.size());
size_t numel = main_tensor.numel();
ncclDataType_t data_type = platform::ToNCCLDataType(main_tensor.type());
for (size_t i = 0; i < member_->places_.size(); ++i) {
auto place = member_->places_[i];
void *buffer;
if (i == 0 && trainer_id == 0) {
buffer = const_cast<void *>(main_tensor.data<void>());
} else {
auto local_scope = member_->local_scopes_[i];
auto *t = local_scope->Var(var)->GetMutable<LoDTensor>();
t->Resize(dims);
buffer = t->mutable_data(place, main_tensor.type());
}
buffers.push_back(buffer);
}
PADDLE_ENFORCE_EQ(member_->places_.size(), buffers.size(),
PADDLE_ENFORCE_EQ(member_->places_.size(), buffers.size(),
platform::errors::PreconditionNotMet(
"variables' buffer size to bcast is %d, which is "
"NOT equal to places size %d",
......@@ -1367,6 +1058,399 @@ bool ParallelExecutor::EnableParallelGraphExecution(
return enable_parallel_graph;
}
void ParallelExecutor::InitExecutorPrivateMemberInfo(
const ExecutionStrategy &exec_strategy, const BuildStrategy &build_strategy,
size_t device_count, const ir::Graph &graph) {
member_->use_device_ = exec_strategy.use_device_;
member_->build_strategy_ = build_strategy;
member_->use_all_reduce_ = member_->build_strategy_.reduce_ ==
BuildStrategy::ReduceStrategy::kAllReduce;
member_->nranks_ = build_strategy.num_trainers_ * device_count;
if (!member_->use_all_reduce_ && member_->nranks_ == 1) {
LOG(INFO) << "If you set build_strategy.reduce with 'Reduce',"
"the number of places should be greater than 1.";
member_->build_strategy_.reduce_ =
BuildStrategy::ReduceStrategy::kAllReduce;
member_->use_all_reduce_ = true;
}
#if (defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)) && defined(_WIN32)
if (member_->IsUseCUDA(member_->use_device_)) {
PADDLE_ENFORCE_EQ(
device_count, 1,
platform::errors::Unavailable("Windows can support Single GPU only."));
}
#endif
#if (defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)) && \
(!defined(PADDLE_WITH_NCCL) && !defined(PADDLE_WITH_RCCL))
if (member_->IsUseCUDA(member_->use_device_)) {
PADDLE_ENFORCE_EQ(
device_count, 1,
platform::errors::PermissionDenied(
"Your machine has multiple cards, "
"but the WITH_NCCL option is not turned on during compilation, "
"and you cannot use multi-card training or prediction. "
"Please recompile and turn on the WITH_NCCL option."));
}
#endif
std::string device_name;
if (member_->use_device_ == p::kCPU) {
device_name = "CPU";
} else if (member_->use_device_ == p::kCUDA) {
device_name = "CUDA";
} else {
device_name = "XPU";
}
VLOG(1) << string::Sprintf(
"The Program will be executed on %s using ParallelExecutor, %lu "
"cards are used, so %lu programs are executed in parallel.",
device_name, device_count, device_count);
// FIXME(Yancey1989): parallel graph mode get better performance
// in GPU allreduce distributed training. Need an elegant way to
// choice the execution strategy.
member_->build_strategy_.enable_parallel_graph_ =
EnableParallelGraphExecution(graph, exec_strategy,
member_->build_strategy_);
if (member_->build_strategy_.enable_parallel_graph_) {
LOG(INFO) << "The Executor would execute the graph by ParallelGraph "
"Execution which can get better performance,"
<< "you can force it off by env FLAGS_enable_parallel_graph=0";
}
}
void ParallelExecutor::CreateLocalScopes(
Scope *global_scope, const std::vector<Scope *> &local_scopes,
bool create_new) {
if (local_scopes.empty()) {
member_->own_local_scope_ = true;
member_->local_scopes_.emplace_back(global_scope);
for (size_t i = 1; i < member_->places_.size(); ++i) {
member_->local_scopes_.emplace_back(&global_scope->NewScope());
}
} else {
member_->own_local_scope_ = false;
PADDLE_ENFORCE_EQ(member_->places_.size(), local_scopes.size(),
platform::errors::PreconditionNotMet(
"member_->places_.size() = %d is not equal to "
"local_scopes.size() = %d",
member_->places_.size(), local_scopes.size()));
for (size_t i = 0; i < member_->places_.size(); ++i) {
if (create_new) {
member_->local_scopes_.emplace_back(&local_scopes[i]->NewScope());
} else {
// Use local scopes directly
member_->local_scopes_.emplace_back(local_scopes[i]);
}
}
}
}
std::unordered_map<Scope *, Scope *> ParallelExecutor::CreateLocalExecScopes(
const std::vector<Scope *> &local_scopes, bool create_new) {
std::unordered_map<Scope *, Scope *> scope_map;
for (auto *scope : local_scopes) {
Scope *local_exec_scope = scope;
if (create_new) {
local_exec_scope = &scope->NewScope();
}
member_->local_exec_scopes_.emplace_back(local_exec_scope);
scope_map.emplace(scope, local_exec_scope);
}
PADDLE_ENFORCE_EQ(
member_->local_scopes_.size(), member_->local_exec_scopes_.size(),
platform::errors::PreconditionNotMet(
"member_->local_scopes_.size() = %d is not equal to "
"member_->local_exec_scopes_.size() = %d",
member_->local_scopes_.size(), member_->local_exec_scopes_.size()));
return scope_map;
}
std::vector<ir::Graph *> ParallelExecutor::CloneGraphToMultiDevices(
ir::Graph *graph) {
std::vector<ir::Graph *> graphs;
if (member_->build_strategy_.async_mode_) {
PADDLE_ENFORCE_EQ(member_->IsUseCUDA(member_->use_device_), false,
platform::errors::Unavailable(
"gpu mode does not support async_mode_ now!"));
graphs.push_back(graph);
for (size_t i = 1; i < member_->places_.size(); ++i) {
auto *tmp_graph = new ir::Graph(graph->OriginProgram());
async_graphs_.emplace_back(tmp_graph);
graphs.push_back(tmp_graph);
}
}
return graphs;
}
void ParallelExecutor::PrepareNCCLCommunicator(Scope *global_scope) {
if (member_->IsUseCUDA(member_->use_device_) && member_->nranks_ > 1) {
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
member_->InitOrGetNCCLCommunicator(global_scope, &member_->build_strategy_);
// Initialize device context's nccl comm, will be used by normal
// Operators like sync_batch_norm, and collective ops.
// NOTE: more than one ParallelExecutor with same place, the nccl comm will
// be rewrite and there will be some problem.
// NOTE: NCCL group-calls and non-group-calls can not use the same
// NCCL communicator, so for ParallelGraph and Multi-Process mode, re-use
// same communicators.
auto *nccl_ctxs = member_->nccl_ctxs_->GetSyncBatchNormCtx(
global_scope, member_->places_);
auto &pool = platform::DeviceContextPool::Instance();
for (size_t dev_id = 0; dev_id < member_->places_.size(); ++dev_id) {
auto *dev_ctx = static_cast<platform::CUDADeviceContext *>(
pool.Get(member_->places_[dev_id]));
auto &nccl_ctx = nccl_ctxs->at(member_->places_[dev_id]);
dev_ctx->set_nccl_comm(nccl_ctx.comm());
}
#else
PADDLE_THROW(
platform::errors::PreconditionNotMet("Not compiled with CUDA."));
#endif
}
if (member_->use_device_ == p::kXPU && member_->nranks_ > 1) {
#if defined(PADDLE_WITH_XPU_BKCL)
member_->InitOrGetBKCLCommunicator(global_scope, member_->build_strategy_);
auto *bkcl_ctxs = member_->bkcl_ctxs_->GetSyncBatchNormCtx(
global_scope, member_->places_);
auto &pool = platform::DeviceContextPool::Instance();
for (size_t dev_id = 0; dev_id < member_->places_.size(); ++dev_id) {
auto *dev_ctx = static_cast<platform::XPUDeviceContext *>(
pool.Get(member_->places_[dev_id]));
auto &bkcl_ctx = bkcl_ctxs->at(member_->places_[dev_id]);
dev_ctx->set_bkcl_context(bkcl_ctx.comm());
}
#else
PADDLE_THROW(
platform::errors::PreconditionNotMet("Not compiled with XPU."));
#endif
}
}
std::vector<ir::Graph *> ParallelExecutor::CompileGraphWithBuildStrategy(
ir::Graph *graph, std::vector<ir::Graph *> *device_graphs,
const std::string &loss_var_name) {
auto device_count = member_->places_.size();
std::vector<ir::Graph *> async_graphs(device_count);
auto &graphs = *device_graphs;
#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL)
if (member_->build_strategy_.async_mode_) {
PADDLE_ENFORCE_EQ(graphs.size(), device_count,
platform::errors::PreconditionNotMet(
"graphs.size() shoule be %d, but received %d",
device_count, graphs.size()));
VLOG(3) << "use local async mode";
graph = member_->build_strategy_.Apply(
graph, {member_->places_[0]}, loss_var_name,
{member_->local_scopes_[0]}, 1, member_->use_device_,
member_->nccl_ctxs_);
for (size_t i = 1; i < device_count; ++i) {
graphs[i] = member_->build_strategy_.Apply(
graphs[i], {member_->places_[i]}, loss_var_name,
{member_->local_scopes_[i]}, 1, member_->use_device_,
member_->nccl_ctxs_);
async_graphs[i] = graphs[i];
}
} else {
graph = member_->build_strategy_.Apply(
graph, member_->places_, loss_var_name, member_->local_scopes_,
member_->nranks_, member_->use_device_, member_->nccl_ctxs_);
}
#elif defined(PADDLE_WITH_XPU_BKCL)
if (member_->build_strategy_.async_mode_) {
PADDLE_ENFORCE_EQ(graphs.size(), device_count,
platform::errors::PreconditionNotMet(
"graphs.size() shoule be %d, but received %d",
device_count, graphs.size()));
VLOG(3) << "use local async mode";
graph = member_->build_strategy_.Apply(
graph, {member_->places_[0]}, loss_var_name,
{member_->local_scopes_[0]}, 1, member_->use_device_,
member_->bkcl_ctxs_);
for (size_t i = 1; i < device_count; ++i) {
graphs[i] = member_->build_strategy_.Apply(
graphs[i], {member_->places_[i]}, loss_var_name,
{member_->local_scopes_[i]}, 1, member_->use_device_,
member_->bkcl_ctxs_);
async_graphs[i] = graphs[i];
}
} else {
graph = member_->build_strategy_.Apply(
graph, member_->places_, loss_var_name, member_->local_scopes_,
member_->nranks_, member_->use_device_, member_->bkcl_ctxs_);
}
#else
if (member_->build_strategy_.async_mode_) {
VLOG(3) << "use local async mode";
graph = member_->build_strategy_.Apply(
graph, {member_->places_[0]}, loss_var_name,
{member_->local_scopes_[0]}, 1, member_->use_device_);
for (size_t i = 1; i < device_count; ++i) {
graphs[i] = member_->build_strategy_.Apply(
graphs[i], {member_->places_[i]}, loss_var_name,
{member_->local_scopes_[i]}, 1, member_->use_device_);
async_graphs[i] = graphs[i];
}
} else {
graph = member_->build_strategy_.Apply(
graph, member_->places_, loss_var_name, member_->local_scopes_,
member_->nranks_, member_->use_device_);
}
#endif
return async_graphs;
}
void ParallelExecutor::CreateVariableInfos(
std::vector<details::VariableInfo> *var_infos, ir::Graph *graph) {
PADDLE_ENFORCE_EQ(
var_infos->size(), 0,
platform::errors::PreconditionNotMet(
"var_infos->size() shoule be 0, but received %d", var_infos->size()));
PADDLE_ENFORCE_EQ(
member_->is_persistable_.size(), 0,
platform::errors::PreconditionNotMet(
"member_->is_persistable_.size() shoule be 0, but received %d",
member_->is_persistable_.size()));
for (auto &node : graph->Nodes()) {
if (node->IsVar() && !node->IsCtrlVar() && node->Var()) {
var_infos->emplace_back();
var_infos->back().name_ = node->Var()->Name();
var_infos->back().type_ = node->Var()->GetType();
var_infos->back().persistable_ = node->Var()->Persistable();
member_->is_persistable_.emplace(node->Var()->Name(),
node->Var()->Persistable());
}
}
if (graph->Has(details::kFusedVars)) {
auto &fused_vars = graph->Get<details::FusedVars>(details::kFusedVars);
for (auto &fused_var : fused_vars) {
var_infos->emplace_back();
var_infos->back() = fused_var.second;
member_->is_persistable_.emplace(fused_var.first,
fused_var.second.persistable_);
}
}
}
std::vector<ir::Graph *> ParallelExecutor::CreateSSAGraphExecutor(
const ExecutionStrategy &exec_strategy,
std::vector<ir::Graph *> *async_graphs, ir::Graph *graph) {
std::vector<ir::Graph *> final_graphs;
if (member_->build_strategy_.async_mode_) {
VLOG(3) << "use AsyncSSAGraphExecutor";
member_->executor_.reset(new details::AsyncSSAGraphExecutor(
exec_strategy, member_->local_scopes_, member_->local_exec_scopes_,
member_->places_, *async_graphs));
final_graphs = *async_graphs;
} else if (member_->build_strategy_.enable_parallel_graph_) {
VLOG(3) << "use ParallelSSAGraphExecutor";
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
// TODO(Yancey1989): Remove passing in the main_program when
// allreduce_seq_pass doesn't need it as the attr.
bool is_inference = details::IsDataParallelInferenceGraph(*graph);
bool has_drop_last_read_op = details::HasDropLastReadOp(*graph);
auto *pg_exe = new details::ParallelSSAGraphExecutor(
exec_strategy, member_->local_scopes_, member_->local_exec_scopes_,
member_->places_, graph);
final_graphs = pg_exe->Graphs();
member_->executor_.reset(pg_exe);
if (is_inference && member_->places_.size() > 1) {
member_->inference_executor_ = pg_exe;
if (!has_drop_last_read_op) {
VLOG(5) << "Enable partial feed support in inference phase";
pg_exe->EnablePartialFeedSupport();
}
}
#else
PADDLE_THROW(platform::errors::PreconditionNotMet(
"Paddle should be compiled with CUDA for ParallelGraph Execution."));
#endif
} else {
bool has_drop_last_read_op = details::HasDropLastReadOp(*graph);
auto possible_inference_graphs =
details::TrySeparateToMultipleSingleDeviceGraphs(graph);
if (!possible_inference_graphs.empty()) {
VLOG(5) << "Use ParallelSSAGraphExecutor in inference phase";
auto *pg_exe = new details::ParallelSSAGraphExecutor(
exec_strategy, member_->local_scopes_, member_->local_exec_scopes_,
member_->places_, std::move(possible_inference_graphs));
if (!has_drop_last_read_op) {
VLOG(5) << "Enable partial feed support in inference phase";
pg_exe->EnablePartialFeedSupport();
}
final_graphs = pg_exe->Graphs();
member_->executor_.reset(pg_exe);
member_->inference_executor_ = pg_exe;
} else {
LOG_IF(WARNING, details::HasKeepLastReadOp(*graph))
<< "drop_last=False for DataLoader is not supported in training "
"network. It is automatically turned to drop_last=True.";
if (exec_strategy.type_ == ExecutionStrategy::kDefault) {
VLOG(3) << "use ThreadedSSAGraphExecutor";
member_->executor_.reset(new details::ThreadedSSAGraphExecutor(
exec_strategy, member_->local_scopes_, member_->local_exec_scopes_,
member_->places_, graph));
} else {
VLOG(3) << "use FastThreadedSSAGraphExecutor";
member_->executor_.reset(new details::FastThreadedSSAGraphExecutor(
exec_strategy, member_->local_scopes_, member_->local_exec_scopes_,
member_->places_, graph));
}
final_graphs.emplace_back(graph);
}
}
return final_graphs;
}
void ParallelExecutor::ResetOpHandleScopeMapOfGraphs(
const std::vector<ir::Graph *> &final_graphs,
const std::unordered_map<Scope *, Scope *> &scope_map) {
PADDLE_ENFORCE_GE(
final_graphs.size(), 1,
platform::errors::PreconditionNotMet(
"final_graphs shoule contain at least one graph, but received %d",
final_graphs.size()));
PADDLE_ENFORCE_GT(scope_map.size(), 0,
platform::errors::PreconditionNotMet(
"scope_map shoule contain at least one "
"element, but received %d",
scope_map.size()));
for (auto *g : final_graphs) {
auto ops = ir::FilterByNodeWrapper<details::OpHandleBase>(*g);
for (auto *op : ops) {
op->SetLocalExecScopes(scope_map);
}
}
}
void ParallelExecutor::SetReaderOpDeviceInfoOfGraphs(
const std::vector<ir::Graph *> &final_graphs) {
if (final_graphs.size() == 1) {
ir::SetReaderOpDeviceInfo(final_graphs[0], member_->places_.size());
} else {
for (size_t i = 0; i < final_graphs.size(); ++i) {
ir::SetReaderOpDeviceInfo(final_graphs[i], member_->places_.size(), i);
}
}
}
const ir::Graph &ParallelExecutor::Graph() const {
return member_->executor_->Graph();
}
......
......@@ -24,6 +24,7 @@ limitations under the License. */
#include "paddle/fluid/framework/details/build_strategy.h"
#include "paddle/fluid/framework/details/execution_strategy.h"
#include "paddle/fluid/framework/details/op_handle_base.h"
#include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h"
#include "paddle/fluid/framework/executor.h"
#include "paddle/fluid/framework/feed_fetch_type.h"
#include "paddle/fluid/framework/op_info.h"
......@@ -41,6 +42,7 @@ namespace framework {
class ParallelExecutorPrivate;
using details::VariableInfo;
using details::BuildStrategy;
using details::ExecutionStrategy;
namespace p = paddle::platform;
......@@ -93,6 +95,40 @@ class ParallelExecutor {
const ExecutionStrategy &exec_strategy,
const BuildStrategy &build_strategy) const;
void InitExecutorPrivateMemberInfo(const ExecutionStrategy &exec_strategy,
const BuildStrategy &build_strategy,
size_t device_count,
const ir::Graph &graph);
void CreateLocalScopes(Scope *global_scope,
const std::vector<Scope *> &local_scopes,
bool create_new);
std::unordered_map<Scope *, Scope *> CreateLocalExecScopes(
const std::vector<Scope *> &local_scopes, bool create_new);
std::vector<ir::Graph *> CloneGraphToMultiDevices(ir::Graph *graph);
void PrepareNCCLCommunicator(Scope *global_scope);
std::vector<ir::Graph *> CompileGraphWithBuildStrategy(
ir::Graph *graph, std::vector<ir::Graph *> *graphs,
const std::string &loss_var_name);
void CreateVariableInfos(std::vector<VariableInfo> *var_infos,
ir::Graph *graph);
std::vector<ir::Graph *> CreateSSAGraphExecutor(
const ExecutionStrategy &exec_strategy,
std::vector<ir::Graph *> *async_graphs, ir::Graph *graph);
void ResetOpHandleScopeMapOfGraphs(
const std::vector<ir::Graph *> &final_graphs,
const std::unordered_map<Scope *, Scope *> &scope_map);
void SetReaderOpDeviceInfoOfGraphs(
const std::vector<ir::Graph *> &final_graphs);
ParallelExecutorPrivate *member_;
std::vector<std::unique_ptr<ir::Graph>> async_graphs_;
};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册