From faa8c703ee7e51b4b1823677db2309e959f9fd15 Mon Sep 17 00:00:00 2001 From: Aurelius84 Date: Fri, 23 Apr 2021 19:56:01 +0800 Subject: [PATCH] Polish ParallelExectuor constructor into small functions (#32191) * Refine Constructor logic of ParallelExecutor * refine function name * refine code comment --- paddle/fluid/framework/parallel_executor.cc | 816 +++++++++++--------- paddle/fluid/framework/parallel_executor.h | 36 + 2 files changed, 486 insertions(+), 366 deletions(-) diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index dd93639f31..73a699b41c 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -27,7 +27,6 @@ limitations under the License. */ #include "paddle/fluid/framework/details/multi_devices_helper.h" #include "paddle/fluid/framework/details/op_handle_base.h" #include "paddle/fluid/framework/details/parallel_ssa_graph_executor.h" -#include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h" #include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h" #include "paddle/fluid/framework/ir/graph.h" #include "paddle/fluid/framework/ir/graph_helper.h" @@ -631,141 +630,15 @@ ParallelExecutor::ParallelExecutor(const std::vector &places, InitP2P(places); ir::InitReaderQueueDeviceCount(graph, *(member_->global_scope_), member_->places_.size()); - member_->use_device_ = exec_strategy.use_device_; - member_->build_strategy_ = build_strategy; - member_->use_all_reduce_ = member_->build_strategy_.reduce_ == - BuildStrategy::ReduceStrategy::kAllReduce; - member_->nranks_ = build_strategy.num_trainers_ * places.size(); - if (!member_->use_all_reduce_ && member_->nranks_ == 1) { - LOG(INFO) << "If you set build_strategy.reduce with 'Reduce'," - "the number of places should be greater than 1."; - member_->build_strategy_.reduce_ = - BuildStrategy::ReduceStrategy::kAllReduce; - member_->use_all_reduce_ = true; - } -#if (defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)) && defined(_WIN32) - if (member_->IsUseCUDA(member_->use_device_)) { - PADDLE_ENFORCE_EQ( - places.size(), 1, - platform::errors::Unavailable("Windows can support Single GPU only.")); - } -#endif - -#if (defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)) && \ - (!defined(PADDLE_WITH_NCCL) && !defined(PADDLE_WITH_RCCL)) - if (member_->IsUseCUDA(member_->use_device_)) { - PADDLE_ENFORCE_EQ( - places.size(), 1, - platform::errors::PermissionDenied( - "Your machine has multiple cards, " - "but the WITH_NCCL option is not turned on during compilation, " - "and you cannot use multi-card training or prediction. " - "Please recompile and turn on the WITH_NCCL option.")); - } -#endif - - std::string device_name; - if (member_->use_device_ == p::kCPU) { - device_name = "CPU"; - } else if (member_->use_device_ == p::kCUDA) { - device_name = "CUDA"; - } else { - device_name = "XPU"; - } - - VLOG(1) << string::Sprintf( - "The Program will be executed on %s using ParallelExecutor, %lu " - "cards are used, so %lu programs are executed in parallel.", - device_name, places.size(), places.size()); - - // Step 1. Bcast the bcast_vars to devs. - // Create local scopes - if (local_scopes.empty()) { - member_->own_local_scope_ = true; - member_->local_scopes_.emplace_back(member_->global_scope_); - for (size_t i = 1; i < member_->places_.size(); ++i) { - member_->local_scopes_.emplace_back(&scope->NewScope()); - } - } else { - member_->own_local_scope_ = false; - PADDLE_ENFORCE_EQ(member_->places_.size(), local_scopes.size(), - platform::errors::PreconditionNotMet( - "member_->places_.size() = %d is not equal to " - "local_scopes.size() = %d", - member_->places_.size(), local_scopes.size())); - for (size_t i = 0; i < member_->places_.size(); ++i) { - member_->local_scopes_.emplace_back(&local_scopes[i]->NewScope()); - } - } - - std::vector graphs; - if (member_->build_strategy_.async_mode_) { - PADDLE_ENFORCE_EQ(member_->IsUseCUDA(member_->use_device_), false, - platform::errors::Unavailable( - "gpu mode does not support async_mode_ now!")); - graphs.push_back(graph); - for (size_t i = 1; i < places.size(); ++i) { - auto *tmp_graph = new ir::Graph(graph->OriginProgram()); - async_graphs_.emplace_back(tmp_graph); - graphs.push_back(tmp_graph); - } - } - - // FIXME(Yancey1989): parallel graph mode get better performance - // in GPU allreduce distributed training. Need an elegant way to - // choice the execution strategy. - member_->build_strategy_.enable_parallel_graph_ = - EnableParallelGraphExecution(*graph, exec_strategy, - member_->build_strategy_); - if (member_->build_strategy_.enable_parallel_graph_) { - LOG(INFO) << "The Executor would execute the graph by ParallelGraph " - "Execution which can get better performance," - << "you can force it off by env FLAGS_enable_parallel_graph=0"; - } + // Initialize necessary info of member_ with strategy. + InitExecutorPrivateMemberInfo(exec_strategy, build_strategy, places.size(), + *graph); - if (member_->IsUseCUDA(member_->use_device_) && member_->nranks_ > 1) { -#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) - member_->InitOrGetNCCLCommunicator(scope, &member_->build_strategy_); - - // Initialize device context's nccl comm, will be used by normal - // Operators like sync_batch_norm, and collective ops. - // NOTE: more than one ParallelExecutor with same place, the nccl comm will - // be rewrite and there will be some problem. - // NOTE: NCCL group-calls and non-group-calls can not use the same - // NCCL communicator, so for ParallelGraph and Multi-Process mode, re-use - // same communicators. - auto *nccl_ctxs = - member_->nccl_ctxs_->GetSyncBatchNormCtx(scope, member_->places_); - auto &pool = platform::DeviceContextPool::Instance(); - for (size_t dev_id = 0; dev_id < member_->places_.size(); ++dev_id) { - auto *dev_ctx = static_cast( - pool.Get(member_->places_[dev_id])); - auto &nccl_ctx = nccl_ctxs->at(member_->places_[dev_id]); - dev_ctx->set_nccl_comm(nccl_ctx.comm()); - } -#else - PADDLE_THROW( - platform::errors::PreconditionNotMet("Not compiled with CUDA.")); -#endif - } - if (member_->use_device_ == p::kXPU && member_->nranks_ > 1) { -#if defined(PADDLE_WITH_XPU_BKCL) - member_->InitOrGetBKCLCommunicator(scope, member_->build_strategy_); + // Step 1. Create local scopes and Clone graph into multi device + CreateLocalScopes(scope, local_scopes, /*create_new*/ true); + std::vector graphs = CloneGraphToMultiDevices(graph); + PrepareNCCLCommunicator(scope); - auto *bkcl_ctxs = - member_->bkcl_ctxs_->GetSyncBatchNormCtx(scope, member_->places_); - auto &pool = platform::DeviceContextPool::Instance(); - for (size_t dev_id = 0; dev_id < member_->places_.size(); ++dev_id) { - auto *dev_ctx = static_cast( - pool.Get(member_->places_[dev_id])); - auto &bkcl_ctx = bkcl_ctxs->at(member_->places_[dev_id]); - dev_ctx->set_bkcl_context(bkcl_ctx.comm()); - } -#else - PADDLE_THROW( - platform::errors::PreconditionNotMet("Not compiled with XPU.")); -#endif - } // broadcast parameters from the 0th device to others: auto need_broadcast = [&]() -> bool { if (member_->build_strategy_.num_trainers_ > 1) { @@ -778,257 +651,75 @@ ParallelExecutor::ParallelExecutor(const std::vector &places, } return false; }; - // Bcast Parameters to all GPUs if (need_broadcast()) { BCastParamsToDevices(bcast_vars, member_->build_strategy_.trainer_id_); } - // Startup Program has been run. All local scopes has correct parameters. - // Step 2. Convert main_program to SSA form and dependency graph. Also, insert // ncclOp - std::vector async_graphs(places.size()); -#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) - if (member_->build_strategy_.async_mode_) { - VLOG(3) << "use local async mode"; - graph = member_->build_strategy_.Apply( - graph, {member_->places_[0]}, loss_var_name, - {member_->local_scopes_[0]}, 1, member_->use_device_, - member_->nccl_ctxs_); - for (size_t i = 1; i < member_->places_.size(); ++i) { - graphs[i] = member_->build_strategy_.Apply( - graphs[i], {member_->places_[i]}, loss_var_name, - {member_->local_scopes_[i]}, 1, member_->use_device_, - member_->nccl_ctxs_); - async_graphs[i] = graphs[i]; - } - } else { - graph = member_->build_strategy_.Apply( - graph, member_->places_, loss_var_name, member_->local_scopes_, - member_->nranks_, member_->use_device_, member_->nccl_ctxs_); - } -#elif defined(PADDLE_WITH_XPU_BKCL) - if (member_->build_strategy_.async_mode_) { - VLOG(3) << "use local async mode"; - graph = member_->build_strategy_.Apply( - graph, {member_->places_[0]}, loss_var_name, - {member_->local_scopes_[0]}, 1, member_->use_device_, - member_->bkcl_ctxs_); - for (size_t i = 1; i < member_->places_.size(); ++i) { - graphs[i] = member_->build_strategy_.Apply( - graphs[i], {member_->places_[i]}, loss_var_name, - {member_->local_scopes_[i]}, 1, member_->use_device_, - member_->bkcl_ctxs_); - async_graphs[i] = graphs[i]; - } - } else { - graph = member_->build_strategy_.Apply( - graph, member_->places_, loss_var_name, member_->local_scopes_, - member_->nranks_, member_->use_device_, member_->bkcl_ctxs_); - } -#else - if (member_->build_strategy_.async_mode_) { - VLOG(3) << "use local async mode"; - graph = member_->build_strategy_.Apply( - graph, {member_->places_[0]}, loss_var_name, - {member_->local_scopes_[0]}, 1, member_->use_device_); - for (size_t i = 1; i < member_->places_.size(); ++i) { - graphs[i] = member_->build_strategy_.Apply( - graphs[i], {member_->places_[i]}, loss_var_name, - {member_->local_scopes_[i]}, 1, member_->use_device_); - async_graphs[i] = graphs[i]; - } - } else { - graph = member_->build_strategy_.Apply( - graph, member_->places_, loss_var_name, member_->local_scopes_, - member_->nranks_, member_->use_device_); - } -#endif - + std::vector async_graphs = + CompileGraphWithBuildStrategy(graph, &graphs, loss_var_name); graph = member_->ApplyMemoryOptimizePass(graph); - async_graphs[0] = graph; // Step 3. Create vars in each scope. Passes may also create new vars. // skip control vars and empty vars std::vector var_infos; - for (auto &node : graph->Nodes()) { - if (node->IsVar() && !node->IsCtrlVar() && node->Var()) { - var_infos.emplace_back(); - var_infos.back().name_ = node->Var()->Name(); - var_infos.back().type_ = node->Var()->GetType(); - var_infos.back().persistable_ = node->Var()->Persistable(); - - member_->is_persistable_.emplace(node->Var()->Name(), - node->Var()->Persistable()); - } - } - - if (graph->Has(details::kFusedVars)) { - auto &fused_vars = graph->Get(details::kFusedVars); - for (auto &fused_var : fused_vars) { - var_infos.emplace_back(); - var_infos.back() = fused_var.second; + CreateVariableInfos(&var_infos, graph); + std::unordered_map scope_map = + CreateLocalExecScopes(member_->local_scopes_, /*create_new*/ true); - member_->is_persistable_.emplace(fused_var.first, - fused_var.second.persistable_); - } - } + // Step 4. Create SSAGraph executor + std::vector final_graphs = + CreateSSAGraphExecutor(exec_strategy, &async_graphs, graph); - std::unordered_map scope_map; - for (auto *scope : member_->local_scopes_) { - auto &local_exec_scope = scope->NewScope(); - member_->local_exec_scopes_.emplace_back(&local_exec_scope); - scope_map.emplace(scope, &local_exec_scope); + VLOG(3) << "use ScopeBufferedSSAGraphExecutor"; + if (!member_->build_strategy_.async_mode_) { + member_->executor_.reset(new details::ScopeBufferedSSAGraphExecutor( + exec_strategy, member_->local_scopes_, member_->local_exec_scopes_, + std::move(var_infos), member_->places_, std::move(member_->executor_))); } - PADDLE_ENFORCE_EQ( - member_->local_scopes_.size(), member_->local_exec_scopes_.size(), - platform::errors::PreconditionNotMet( - "member_->local_scopes_.size() = %d is not equal to " - "member_->local_exec_scopes_.size() = %d", - member_->local_scopes_.size(), member_->local_exec_scopes_.size())); + ResetOpHandleScopeMapOfGraphs(final_graphs, scope_map); + SetReaderOpDeviceInfoOfGraphs(final_graphs); +} - std::vector final_graphs; +void ParallelExecutor::BCastParamsToDevices( + const std::vector &vars, int trainer_id) const { + VLOG(3) << "BCastParamsToDevices"; + // the initializing bcast, all vars would be bcast from device(0). + for (auto &var : vars) { + framework::Variable *main_var = member_->local_scopes_[0]->FindVar(var); + if (main_var == nullptr || !main_var->IsType()) { + continue; + } - if (member_->build_strategy_.async_mode_) { - VLOG(3) << "use AsyncSSAGraphExecutor"; - member_->executor_.reset(new details::AsyncSSAGraphExecutor( - exec_strategy, member_->local_scopes_, member_->local_exec_scopes_, - member_->places_, async_graphs)); - final_graphs = async_graphs; - } else if (member_->build_strategy_.enable_parallel_graph_) { - VLOG(3) << "use ParallelSSAGraphExecutor"; -#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) - // TODO(Yancey1989): Remove passing in the main_program when - // allreduce_seq_pass doesn't need it as the attr. - bool is_inference = details::IsDataParallelInferenceGraph(*graph); - bool has_drop_last_read_op = details::HasDropLastReadOp(*graph); + auto &main_tensor = main_var->Get(); + if (!main_tensor.IsInitialized()) { + VLOG(3) << "one in var not inited, return!"; + continue; + } + auto &dims = main_tensor.dims(); + if (paddle::platform::is_gpu_place(main_tensor.place())) { +#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) + std::vector buffers; + buffers.reserve(member_->places_.size()); + size_t numel = main_tensor.numel(); + ncclDataType_t data_type = platform::ToNCCLDataType(main_tensor.type()); + for (size_t i = 0; i < member_->places_.size(); ++i) { + auto place = member_->places_[i]; + void *buffer; - auto *pg_exe = new details::ParallelSSAGraphExecutor( - exec_strategy, member_->local_scopes_, member_->local_exec_scopes_, - member_->places_, graph); - final_graphs = pg_exe->Graphs(); - member_->executor_.reset(pg_exe); - - if (is_inference && member_->places_.size() > 1) { - member_->inference_executor_ = pg_exe; - if (!has_drop_last_read_op) { - VLOG(5) << "Enable partial feed support in inference phase"; - pg_exe->EnablePartialFeedSupport(); - } - } -#else - PADDLE_THROW(platform::errors::PreconditionNotMet( - "Paddle should be compiled with CUDA for ParallelGraph Execution.")); -#endif - } else { - bool has_drop_last_read_op = details::HasDropLastReadOp(*graph); - auto possible_inference_graphs = - details::TrySeparateToMultipleSingleDeviceGraphs(graph); - if (!possible_inference_graphs.empty()) { - VLOG(5) << "Use ParallelSSAGraphExecutor in inference phase"; - auto *pg_exe = new details::ParallelSSAGraphExecutor( - exec_strategy, member_->local_scopes_, member_->local_exec_scopes_, - member_->places_, std::move(possible_inference_graphs)); - if (!has_drop_last_read_op) { - VLOG(5) << "Enable partial feed support in inference phase"; - pg_exe->EnablePartialFeedSupport(); - } - final_graphs = pg_exe->Graphs(); - member_->executor_.reset(pg_exe); - member_->inference_executor_ = pg_exe; - } else { - LOG_IF(WARNING, details::HasKeepLastReadOp(*graph)) - << "drop_last=False for DataLoader is not supported in training " - "network. It is automatically turned to drop_last=True."; - if (exec_strategy.type_ == ExecutionStrategy::kDefault) { - VLOG(3) << "use ThreadedSSAGraphExecutor"; - member_->executor_.reset(new details::ThreadedSSAGraphExecutor( - exec_strategy, member_->local_scopes_, member_->local_exec_scopes_, - member_->places_, graph)); - } else { - if (member_->use_device_ == p::kXPU) { -#if defined(PADDLE_WITH_XPU) - VLOG(3) << "use BindThreadedSSAGraphExecutor"; - member_->executor_.reset(new details::BindThreadedSSAGraphExecutor( - exec_strategy, member_->local_scopes_, - member_->local_exec_scopes_, member_->places_, graph)); -#else - PADDLE_THROW(platform::errors::PermissionDenied( - "Paddle can't use XPU device since it's not compiled with XPU," - "Please recompile or reinstall Paddle with XPU support.")); -#endif - } else { - VLOG(3) << "use FastThreadedSSAGraphExecutor"; - member_->executor_.reset(new details::FastThreadedSSAGraphExecutor( - exec_strategy, member_->local_scopes_, - member_->local_exec_scopes_, member_->places_, graph)); - } - } - final_graphs.emplace_back(graph); - } - } - - VLOG(3) << "use ScopeBufferedSSAGraphExecutor"; - if (!member_->build_strategy_.async_mode_) { - member_->executor_.reset(new details::ScopeBufferedSSAGraphExecutor( - exec_strategy, member_->local_scopes_, member_->local_exec_scopes_, - std::move(var_infos), member_->places_, std::move(member_->executor_))); - } - - for (auto *g : final_graphs) { - auto ops = ir::FilterByNodeWrapper(*g); - for (auto *op : ops) { - op->SetLocalExecScopes(scope_map); - } - } - - if (final_graphs.size() == 1) { - ir::SetReaderOpDeviceInfo(final_graphs[0], member_->places_.size()); - } else { - for (size_t i = 0; i < final_graphs.size(); ++i) { - ir::SetReaderOpDeviceInfo(final_graphs[i], member_->places_.size(), i); - } - } -} - -void ParallelExecutor::BCastParamsToDevices( - const std::vector &vars, int trainer_id) const { - VLOG(3) << "BCastParamsToDevices"; - // the initializing bcast, all vars would be bcast from device(0). - for (auto &var : vars) { - framework::Variable *main_var = member_->local_scopes_[0]->FindVar(var); - if (main_var == nullptr || !main_var->IsType()) { - continue; - } - - auto &main_tensor = main_var->Get(); - if (!main_tensor.IsInitialized()) { - VLOG(3) << "one in var not inited, return!"; - continue; - } - auto &dims = main_tensor.dims(); - if (paddle::platform::is_gpu_place(main_tensor.place())) { -#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) - std::vector buffers; - buffers.reserve(member_->places_.size()); - size_t numel = main_tensor.numel(); - ncclDataType_t data_type = platform::ToNCCLDataType(main_tensor.type()); - for (size_t i = 0; i < member_->places_.size(); ++i) { - auto place = member_->places_[i]; - void *buffer; - - if (i == 0 && trainer_id == 0) { - buffer = const_cast(main_tensor.data()); - } else { - auto local_scope = member_->local_scopes_[i]; - auto *t = local_scope->Var(var)->GetMutable(); - t->Resize(dims); - buffer = t->mutable_data(place, main_tensor.type()); - } - buffers.push_back(buffer); - } + if (i == 0 && trainer_id == 0) { + buffer = const_cast(main_tensor.data()); + } else { + auto local_scope = member_->local_scopes_[i]; + auto *t = local_scope->Var(var)->GetMutable(); + t->Resize(dims); + buffer = t->mutable_data(place, main_tensor.type()); + } + buffers.push_back(buffer); + } PADDLE_ENFORCE_EQ(member_->places_.size(), buffers.size(), platform::errors::PreconditionNotMet( @@ -1367,6 +1058,399 @@ bool ParallelExecutor::EnableParallelGraphExecution( return enable_parallel_graph; } +void ParallelExecutor::InitExecutorPrivateMemberInfo( + const ExecutionStrategy &exec_strategy, const BuildStrategy &build_strategy, + size_t device_count, const ir::Graph &graph) { + member_->use_device_ = exec_strategy.use_device_; + member_->build_strategy_ = build_strategy; + member_->use_all_reduce_ = member_->build_strategy_.reduce_ == + BuildStrategy::ReduceStrategy::kAllReduce; + member_->nranks_ = build_strategy.num_trainers_ * device_count; + if (!member_->use_all_reduce_ && member_->nranks_ == 1) { + LOG(INFO) << "If you set build_strategy.reduce with 'Reduce'," + "the number of places should be greater than 1."; + member_->build_strategy_.reduce_ = + BuildStrategy::ReduceStrategy::kAllReduce; + member_->use_all_reduce_ = true; + } +#if (defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)) && defined(_WIN32) + if (member_->IsUseCUDA(member_->use_device_)) { + PADDLE_ENFORCE_EQ( + device_count, 1, + platform::errors::Unavailable("Windows can support Single GPU only.")); + } +#endif + +#if (defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)) && \ + (!defined(PADDLE_WITH_NCCL) && !defined(PADDLE_WITH_RCCL)) + if (member_->IsUseCUDA(member_->use_device_)) { + PADDLE_ENFORCE_EQ( + device_count, 1, + platform::errors::PermissionDenied( + "Your machine has multiple cards, " + "but the WITH_NCCL option is not turned on during compilation, " + "and you cannot use multi-card training or prediction. " + "Please recompile and turn on the WITH_NCCL option.")); + } +#endif + + std::string device_name; + if (member_->use_device_ == p::kCPU) { + device_name = "CPU"; + } else if (member_->use_device_ == p::kCUDA) { + device_name = "CUDA"; + } else { + device_name = "XPU"; + } + + VLOG(1) << string::Sprintf( + "The Program will be executed on %s using ParallelExecutor, %lu " + "cards are used, so %lu programs are executed in parallel.", + device_name, device_count, device_count); + + // FIXME(Yancey1989): parallel graph mode get better performance + // in GPU allreduce distributed training. Need an elegant way to + // choice the execution strategy. + member_->build_strategy_.enable_parallel_graph_ = + EnableParallelGraphExecution(graph, exec_strategy, + member_->build_strategy_); + if (member_->build_strategy_.enable_parallel_graph_) { + LOG(INFO) << "The Executor would execute the graph by ParallelGraph " + "Execution which can get better performance," + << "you can force it off by env FLAGS_enable_parallel_graph=0"; + } +} + +void ParallelExecutor::CreateLocalScopes( + Scope *global_scope, const std::vector &local_scopes, + bool create_new) { + if (local_scopes.empty()) { + member_->own_local_scope_ = true; + member_->local_scopes_.emplace_back(global_scope); + for (size_t i = 1; i < member_->places_.size(); ++i) { + member_->local_scopes_.emplace_back(&global_scope->NewScope()); + } + } else { + member_->own_local_scope_ = false; + PADDLE_ENFORCE_EQ(member_->places_.size(), local_scopes.size(), + platform::errors::PreconditionNotMet( + "member_->places_.size() = %d is not equal to " + "local_scopes.size() = %d", + member_->places_.size(), local_scopes.size())); + for (size_t i = 0; i < member_->places_.size(); ++i) { + if (create_new) { + member_->local_scopes_.emplace_back(&local_scopes[i]->NewScope()); + } else { + // Use local scopes directly + member_->local_scopes_.emplace_back(local_scopes[i]); + } + } + } +} + +std::unordered_map ParallelExecutor::CreateLocalExecScopes( + const std::vector &local_scopes, bool create_new) { + std::unordered_map scope_map; + + for (auto *scope : local_scopes) { + Scope *local_exec_scope = scope; + if (create_new) { + local_exec_scope = &scope->NewScope(); + } + member_->local_exec_scopes_.emplace_back(local_exec_scope); + scope_map.emplace(scope, local_exec_scope); + } + + PADDLE_ENFORCE_EQ( + member_->local_scopes_.size(), member_->local_exec_scopes_.size(), + platform::errors::PreconditionNotMet( + "member_->local_scopes_.size() = %d is not equal to " + "member_->local_exec_scopes_.size() = %d", + member_->local_scopes_.size(), member_->local_exec_scopes_.size())); + + return scope_map; +} + +std::vector ParallelExecutor::CloneGraphToMultiDevices( + ir::Graph *graph) { + std::vector graphs; + if (member_->build_strategy_.async_mode_) { + PADDLE_ENFORCE_EQ(member_->IsUseCUDA(member_->use_device_), false, + platform::errors::Unavailable( + "gpu mode does not support async_mode_ now!")); + graphs.push_back(graph); + for (size_t i = 1; i < member_->places_.size(); ++i) { + auto *tmp_graph = new ir::Graph(graph->OriginProgram()); + async_graphs_.emplace_back(tmp_graph); + graphs.push_back(tmp_graph); + } + } + + return graphs; +} + +void ParallelExecutor::PrepareNCCLCommunicator(Scope *global_scope) { + if (member_->IsUseCUDA(member_->use_device_) && member_->nranks_ > 1) { +#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) + member_->InitOrGetNCCLCommunicator(global_scope, &member_->build_strategy_); + + // Initialize device context's nccl comm, will be used by normal + // Operators like sync_batch_norm, and collective ops. + // NOTE: more than one ParallelExecutor with same place, the nccl comm will + // be rewrite and there will be some problem. + // NOTE: NCCL group-calls and non-group-calls can not use the same + // NCCL communicator, so for ParallelGraph and Multi-Process mode, re-use + // same communicators. + auto *nccl_ctxs = member_->nccl_ctxs_->GetSyncBatchNormCtx( + global_scope, member_->places_); + auto &pool = platform::DeviceContextPool::Instance(); + for (size_t dev_id = 0; dev_id < member_->places_.size(); ++dev_id) { + auto *dev_ctx = static_cast( + pool.Get(member_->places_[dev_id])); + auto &nccl_ctx = nccl_ctxs->at(member_->places_[dev_id]); + dev_ctx->set_nccl_comm(nccl_ctx.comm()); + } +#else + PADDLE_THROW( + platform::errors::PreconditionNotMet("Not compiled with CUDA.")); +#endif + } + if (member_->use_device_ == p::kXPU && member_->nranks_ > 1) { +#if defined(PADDLE_WITH_XPU_BKCL) + member_->InitOrGetBKCLCommunicator(global_scope, member_->build_strategy_); + + auto *bkcl_ctxs = member_->bkcl_ctxs_->GetSyncBatchNormCtx( + global_scope, member_->places_); + auto &pool = platform::DeviceContextPool::Instance(); + for (size_t dev_id = 0; dev_id < member_->places_.size(); ++dev_id) { + auto *dev_ctx = static_cast( + pool.Get(member_->places_[dev_id])); + auto &bkcl_ctx = bkcl_ctxs->at(member_->places_[dev_id]); + dev_ctx->set_bkcl_context(bkcl_ctx.comm()); + } +#else + PADDLE_THROW( + platform::errors::PreconditionNotMet("Not compiled with XPU.")); +#endif + } +} + +std::vector ParallelExecutor::CompileGraphWithBuildStrategy( + ir::Graph *graph, std::vector *device_graphs, + const std::string &loss_var_name) { + auto device_count = member_->places_.size(); + std::vector async_graphs(device_count); + + auto &graphs = *device_graphs; +#if defined(PADDLE_WITH_NCCL) || defined(PADDLE_WITH_RCCL) + if (member_->build_strategy_.async_mode_) { + PADDLE_ENFORCE_EQ(graphs.size(), device_count, + platform::errors::PreconditionNotMet( + "graphs.size() shoule be %d, but received %d", + device_count, graphs.size())); + VLOG(3) << "use local async mode"; + graph = member_->build_strategy_.Apply( + graph, {member_->places_[0]}, loss_var_name, + {member_->local_scopes_[0]}, 1, member_->use_device_, + member_->nccl_ctxs_); + for (size_t i = 1; i < device_count; ++i) { + graphs[i] = member_->build_strategy_.Apply( + graphs[i], {member_->places_[i]}, loss_var_name, + {member_->local_scopes_[i]}, 1, member_->use_device_, + member_->nccl_ctxs_); + async_graphs[i] = graphs[i]; + } + } else { + graph = member_->build_strategy_.Apply( + graph, member_->places_, loss_var_name, member_->local_scopes_, + member_->nranks_, member_->use_device_, member_->nccl_ctxs_); + } +#elif defined(PADDLE_WITH_XPU_BKCL) + if (member_->build_strategy_.async_mode_) { + PADDLE_ENFORCE_EQ(graphs.size(), device_count, + platform::errors::PreconditionNotMet( + "graphs.size() shoule be %d, but received %d", + device_count, graphs.size())); + VLOG(3) << "use local async mode"; + graph = member_->build_strategy_.Apply( + graph, {member_->places_[0]}, loss_var_name, + {member_->local_scopes_[0]}, 1, member_->use_device_, + member_->bkcl_ctxs_); + for (size_t i = 1; i < device_count; ++i) { + graphs[i] = member_->build_strategy_.Apply( + graphs[i], {member_->places_[i]}, loss_var_name, + {member_->local_scopes_[i]}, 1, member_->use_device_, + member_->bkcl_ctxs_); + async_graphs[i] = graphs[i]; + } + } else { + graph = member_->build_strategy_.Apply( + graph, member_->places_, loss_var_name, member_->local_scopes_, + member_->nranks_, member_->use_device_, member_->bkcl_ctxs_); + } +#else + if (member_->build_strategy_.async_mode_) { + VLOG(3) << "use local async mode"; + graph = member_->build_strategy_.Apply( + graph, {member_->places_[0]}, loss_var_name, + {member_->local_scopes_[0]}, 1, member_->use_device_); + for (size_t i = 1; i < device_count; ++i) { + graphs[i] = member_->build_strategy_.Apply( + graphs[i], {member_->places_[i]}, loss_var_name, + {member_->local_scopes_[i]}, 1, member_->use_device_); + async_graphs[i] = graphs[i]; + } + } else { + graph = member_->build_strategy_.Apply( + graph, member_->places_, loss_var_name, member_->local_scopes_, + member_->nranks_, member_->use_device_); + } +#endif + + return async_graphs; +} + +void ParallelExecutor::CreateVariableInfos( + std::vector *var_infos, ir::Graph *graph) { + PADDLE_ENFORCE_EQ( + var_infos->size(), 0, + platform::errors::PreconditionNotMet( + "var_infos->size() shoule be 0, but received %d", var_infos->size())); + PADDLE_ENFORCE_EQ( + member_->is_persistable_.size(), 0, + platform::errors::PreconditionNotMet( + "member_->is_persistable_.size() shoule be 0, but received %d", + member_->is_persistable_.size())); + for (auto &node : graph->Nodes()) { + if (node->IsVar() && !node->IsCtrlVar() && node->Var()) { + var_infos->emplace_back(); + var_infos->back().name_ = node->Var()->Name(); + var_infos->back().type_ = node->Var()->GetType(); + var_infos->back().persistable_ = node->Var()->Persistable(); + + member_->is_persistable_.emplace(node->Var()->Name(), + node->Var()->Persistable()); + } + } + + if (graph->Has(details::kFusedVars)) { + auto &fused_vars = graph->Get(details::kFusedVars); + for (auto &fused_var : fused_vars) { + var_infos->emplace_back(); + var_infos->back() = fused_var.second; + + member_->is_persistable_.emplace(fused_var.first, + fused_var.second.persistable_); + } + } +} + +std::vector ParallelExecutor::CreateSSAGraphExecutor( + const ExecutionStrategy &exec_strategy, + std::vector *async_graphs, ir::Graph *graph) { + std::vector final_graphs; + + if (member_->build_strategy_.async_mode_) { + VLOG(3) << "use AsyncSSAGraphExecutor"; + member_->executor_.reset(new details::AsyncSSAGraphExecutor( + exec_strategy, member_->local_scopes_, member_->local_exec_scopes_, + member_->places_, *async_graphs)); + final_graphs = *async_graphs; + } else if (member_->build_strategy_.enable_parallel_graph_) { + VLOG(3) << "use ParallelSSAGraphExecutor"; +#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP) + // TODO(Yancey1989): Remove passing in the main_program when + // allreduce_seq_pass doesn't need it as the attr. + bool is_inference = details::IsDataParallelInferenceGraph(*graph); + bool has_drop_last_read_op = details::HasDropLastReadOp(*graph); + + auto *pg_exe = new details::ParallelSSAGraphExecutor( + exec_strategy, member_->local_scopes_, member_->local_exec_scopes_, + member_->places_, graph); + final_graphs = pg_exe->Graphs(); + member_->executor_.reset(pg_exe); + + if (is_inference && member_->places_.size() > 1) { + member_->inference_executor_ = pg_exe; + if (!has_drop_last_read_op) { + VLOG(5) << "Enable partial feed support in inference phase"; + pg_exe->EnablePartialFeedSupport(); + } + } +#else + PADDLE_THROW(platform::errors::PreconditionNotMet( + "Paddle should be compiled with CUDA for ParallelGraph Execution.")); +#endif + } else { + bool has_drop_last_read_op = details::HasDropLastReadOp(*graph); + auto possible_inference_graphs = + details::TrySeparateToMultipleSingleDeviceGraphs(graph); + if (!possible_inference_graphs.empty()) { + VLOG(5) << "Use ParallelSSAGraphExecutor in inference phase"; + auto *pg_exe = new details::ParallelSSAGraphExecutor( + exec_strategy, member_->local_scopes_, member_->local_exec_scopes_, + member_->places_, std::move(possible_inference_graphs)); + if (!has_drop_last_read_op) { + VLOG(5) << "Enable partial feed support in inference phase"; + pg_exe->EnablePartialFeedSupport(); + } + final_graphs = pg_exe->Graphs(); + member_->executor_.reset(pg_exe); + member_->inference_executor_ = pg_exe; + } else { + LOG_IF(WARNING, details::HasKeepLastReadOp(*graph)) + << "drop_last=False for DataLoader is not supported in training " + "network. It is automatically turned to drop_last=True."; + if (exec_strategy.type_ == ExecutionStrategy::kDefault) { + VLOG(3) << "use ThreadedSSAGraphExecutor"; + member_->executor_.reset(new details::ThreadedSSAGraphExecutor( + exec_strategy, member_->local_scopes_, member_->local_exec_scopes_, + member_->places_, graph)); + } else { + VLOG(3) << "use FastThreadedSSAGraphExecutor"; + member_->executor_.reset(new details::FastThreadedSSAGraphExecutor( + exec_strategy, member_->local_scopes_, member_->local_exec_scopes_, + member_->places_, graph)); + } + final_graphs.emplace_back(graph); + } + } + return final_graphs; +} + +void ParallelExecutor::ResetOpHandleScopeMapOfGraphs( + const std::vector &final_graphs, + const std::unordered_map &scope_map) { + PADDLE_ENFORCE_GE( + final_graphs.size(), 1, + platform::errors::PreconditionNotMet( + "final_graphs shoule contain at least one graph, but received %d", + final_graphs.size())); + + PADDLE_ENFORCE_GT(scope_map.size(), 0, + platform::errors::PreconditionNotMet( + "scope_map shoule contain at least one " + "element, but received %d", + scope_map.size())); + for (auto *g : final_graphs) { + auto ops = ir::FilterByNodeWrapper(*g); + for (auto *op : ops) { + op->SetLocalExecScopes(scope_map); + } + } +} + +void ParallelExecutor::SetReaderOpDeviceInfoOfGraphs( + const std::vector &final_graphs) { + if (final_graphs.size() == 1) { + ir::SetReaderOpDeviceInfo(final_graphs[0], member_->places_.size()); + } else { + for (size_t i = 0; i < final_graphs.size(); ++i) { + ir::SetReaderOpDeviceInfo(final_graphs[i], member_->places_.size(), i); + } + } +} + const ir::Graph &ParallelExecutor::Graph() const { return member_->executor_->Graph(); } diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index 47de7dc48f..d4d0b534b5 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -24,6 +24,7 @@ limitations under the License. */ #include "paddle/fluid/framework/details/build_strategy.h" #include "paddle/fluid/framework/details/execution_strategy.h" #include "paddle/fluid/framework/details/op_handle_base.h" +#include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h" #include "paddle/fluid/framework/executor.h" #include "paddle/fluid/framework/feed_fetch_type.h" #include "paddle/fluid/framework/op_info.h" @@ -41,6 +42,7 @@ namespace framework { class ParallelExecutorPrivate; +using details::VariableInfo; using details::BuildStrategy; using details::ExecutionStrategy; namespace p = paddle::platform; @@ -93,6 +95,40 @@ class ParallelExecutor { const ExecutionStrategy &exec_strategy, const BuildStrategy &build_strategy) const; + void InitExecutorPrivateMemberInfo(const ExecutionStrategy &exec_strategy, + const BuildStrategy &build_strategy, + size_t device_count, + const ir::Graph &graph); + + void CreateLocalScopes(Scope *global_scope, + const std::vector &local_scopes, + bool create_new); + + std::unordered_map CreateLocalExecScopes( + const std::vector &local_scopes, bool create_new); + + std::vector CloneGraphToMultiDevices(ir::Graph *graph); + + void PrepareNCCLCommunicator(Scope *global_scope); + + std::vector CompileGraphWithBuildStrategy( + ir::Graph *graph, std::vector *graphs, + const std::string &loss_var_name); + + void CreateVariableInfos(std::vector *var_infos, + ir::Graph *graph); + + std::vector CreateSSAGraphExecutor( + const ExecutionStrategy &exec_strategy, + std::vector *async_graphs, ir::Graph *graph); + + void ResetOpHandleScopeMapOfGraphs( + const std::vector &final_graphs, + const std::unordered_map &scope_map); + + void SetReaderOpDeviceInfoOfGraphs( + const std::vector &final_graphs); + ParallelExecutorPrivate *member_; std::vector> async_graphs_; }; -- GitLab