diff --git a/paddle/fluid/framework/details/build_strategy.cc b/paddle/fluid/framework/details/build_strategy.cc index 231abac97194e7455c120422d8b435a974ff3bfd..774be6c24c7e19b630b7d9e3c2f46ef81d8a9f09 100644 --- a/paddle/fluid/framework/details/build_strategy.cc +++ b/paddle/fluid/framework/details/build_strategy.cc @@ -206,8 +206,7 @@ std::unique_ptr BuildStrategy::Apply( graph->Erase(kAllOpDescs); } - graph->SetNotOwned>(kAllOpDescs, - &all_ops); // take ownership + graph->SetNotOwned>(kAllOpDescs, &all_ops); pass->Erase(kAllOpDescs); pass->SetNotOwned>(kAllOpDescs, &all_ops); diff --git a/paddle/fluid/framework/details/parallel_ssa_graph_executor.cc b/paddle/fluid/framework/details/parallel_ssa_graph_executor.cc index 18b455cc6c348fbc7e1df8abd76208605c316193..46332a8f23d3fe4deb0c9382a3d4b9dc850fdddc 100644 --- a/paddle/fluid/framework/details/parallel_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/parallel_ssa_graph_executor.cc @@ -20,7 +20,7 @@ namespace framework { namespace details { std::vector> -ParallelSSAGraphExecutor::SeparateMultiDevicesGraph(ir::Graph* graph) { +ParallelSSAGraphExecutor::SeparateMultiDevicesGraph(ir::Graph *graph) { std::vector> graphs; graphs.reserve(places_.size()); for (size_t i = 0; i < places_.size(); ++i) { @@ -76,13 +76,12 @@ ParallelSSAGraphExecutor::SeparateMultiDevicesGraph(ir::Graph* graph) { ParallelSSAGraphExecutor::ParallelSSAGraphExecutor( const ExecutionStrategy &strategy, const std::vector &local_scopes, - const std::vector &places, - const framework::ProgramDesc &main_prog, ir::Graph* graph) + const std::vector &places, ir::Graph *graph) : strategy_(std::move(strategy)), local_scopes_(std::move(local_scopes)), pool_(places.size() >= 2 ? new ::ThreadPool(places.size()) : nullptr), places_(std::move(places)), - main_prog_(main_prog), + main_prog_(graph->OriginProgram()), // TODO(Yancey1989): Copying graphs is not safely since it deleted the // attrs. graphs_(SeparateMultiDevicesGraph(graph)) { diff --git a/paddle/fluid/framework/details/parallel_ssa_graph_executor.h b/paddle/fluid/framework/details/parallel_ssa_graph_executor.h index a1547878a58480f2d8c980a571ec3216e72971c8..a7a792dabd5992ac5f9b966f43016f5f05a70522 100644 --- a/paddle/fluid/framework/details/parallel_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/parallel_ssa_graph_executor.h @@ -31,8 +31,7 @@ class ParallelSSAGraphExecutor : public SSAGraphExecutor { ParallelSSAGraphExecutor(const ExecutionStrategy &strategy, const std::vector &local_scopes, const std::vector &places, - const framework::ProgramDesc &main_prog, - ir::Graph* graph); + ir::Graph *graph); ~ParallelSSAGraphExecutor() final = default; const ir::Graph &Graph() const override { return *graphs_[0]; } @@ -41,7 +40,7 @@ class ParallelSSAGraphExecutor : public SSAGraphExecutor { private: std::vector> SeparateMultiDevicesGraph( - ir::Graph* graph); + ir::Graph *graph); ExecutionStrategy strategy_; std::vector local_scopes_; diff --git a/paddle/fluid/framework/ir/graph.h b/paddle/fluid/framework/ir/graph.h index 6b8115b295f6881e48e10cf9489469283b1c8c50..7e783f74ff44219665530ed330fad6b0e286ad2a 100644 --- a/paddle/fluid/framework/ir/graph.h +++ b/paddle/fluid/framework/ir/graph.h @@ -195,22 +195,12 @@ class Graph { return nullptr; } -<<<<<<< HEAD -======= // Returns reference to the original program. // WARN: After a series of passes, the current graph can be quite // different from OriginProgram. Caller shouldn't assume much from // the returned OriginProgram. const ProgramDesc &OriginProgram() const { return program_; } - void ResolveHazard( - const std::map> &var_nodes); - - private: - std::map> InitFromProgram( - const ProgramDesc &program); - ->>>>>>> polish // This method takes ownership of `node`. ir::Node *AddNode(ir::Node *node) { PADDLE_ENFORCE(node_set_.find(node) == node_set_.end()); diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 2e68a2dd0fa3cb525c55e6aa76fdd9aaf53e9550..3e1d61813ca83ebdf9435036117e79abe501b24b 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -184,9 +184,10 @@ std::vector &ParallelExecutor::GetLocalScopes() { ParallelExecutor::ParallelExecutor( const std::vector &places, const std::unordered_set &bcast_vars, - const std::vector &graphs, const std::string &loss_var_name, - Scope *scope, const std::vector &local_scopes, - const ExecutionStrategy &exec_strategy, const BuildStrategy &build_strategy) + const std::string &loss_var_name, Scope *scope, + const std::vector &local_scopes, + const ExecutionStrategy &exec_strategy, const BuildStrategy &build_strategy, + ir::Graph *graph) : member_(new ParallelExecutorPrivate(places)) { member_->global_scope_ = scope; member_->use_cuda_ = exec_strategy.use_cuda_; @@ -216,34 +217,17 @@ ParallelExecutor::ParallelExecutor( } } -<<<<<<< HEAD std::unique_ptr temp_owned_graph(graph); // FIXME(Yancey1989): parallel graph mode get better performance // in GPU allreduce distributed training. Need an elegant way to // choice the execution strategy. - build_strategy.enable_parallel_graph_ = - EnableParallelGraphExecution(*temp_owned_graph, exec_strategy, build_strategy); + build_strategy.enable_parallel_graph_ = EnableParallelGraphExecution( + *temp_owned_graph, exec_strategy, build_strategy); if (build_strategy.enable_parallel_graph_) VLOG(0) << "The Executor would execute the graph by ParallelGraph " "Execution which can get better performance," << "you can force it off by env FLAGS_enable_parallel_graph=0"; -======= - // TODO(panyx0718): Update pass interface so we don't need this here. - std::vector> temp_owned_graphs; - for (ir::Graph *g : graphs) { - temp_owned_graphs.emplace_back(g); - } -<<<<<<< HEAD ->>>>>>> fix parallel graph mode program - -======= - bool parallel_graphs = (temp_owned_graphs.size() > 1); - if (parallel_graphs) { - PADDLE_ENFORCE_EQ(temp_owned_graphs.size(), places.size()); - } - VLOG(1) << "Enable ParallelGraph Execution: " << parallel_graphs; ->>>>>>> polish if (member_->use_cuda_) { // Bcast Parameters to all GPUs @@ -255,7 +239,7 @@ ParallelExecutor::ParallelExecutor( if (nccl_id_var != nullptr) { nccl_id = nccl_id_var->GetMutable(); } - if (parallel_graphs && member_->nranks_ > 1UL) { + if (build_strategy.enable_parallel_graph_ && member_->nranks_ > 1UL) { if (nccl_id == nullptr) { local_nccl_id_.reset(new ncclUniqueId()); platform::dynload::ncclGetUniqueId(local_nccl_id_.get()); @@ -273,105 +257,54 @@ ParallelExecutor::ParallelExecutor( if (member_->local_scopes_.size() != 1 && local_scopes.empty()) { BCastParamsToDevices(bcast_vars); } - // Startup Program has been run. All local scopes has correct parameters. +// Startup Program has been run. All local scopes has correct parameters. - // Step 2. Convert main_program to SSA form and dependency graph. Also, insert - // ncclOp -<<<<<<< HEAD - std::unique_ptr graph; +// Step 2. Convert main_program to SSA form and dependency graph. Also, insert +// ncclOp #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) - temp_owned_graph = build_strategy.Apply(std::move(temp_owned_graph), member_->places_, loss_var_name, - member_->local_scopes_, member_->nranks_, - member_->use_cuda_, member_->nccl_ctxs_.get()); -#else - temp_owned_graph = build_strategy.Apply(std::move(temp_owned_graph), member_->places_, loss_var_name, - member_->local_scopes_, member_->nranks_, - member_->use_cuda_); - -======= - std::vector compiled_graphs; -#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) - if (parallel_graphs) { - for (size_t i = 0; i < member_->places_.size(); ++i) { - auto temp_owned_graph = build_strategy.Apply( - std::move(temp_owned_graphs[i]), {member_->places_[i]}, loss_var_name, - {member_->local_scopes_[i]}, member_->nranks_, member_->use_cuda_, - member_->nccl_ctxs_.get()); - compiled_graphs.push_back(temp_owned_graph.release()); - } - } else { - auto temp_owned_graph = build_strategy.Apply( - std::move(temp_owned_graphs[0]), member_->places_, loss_var_name, - member_->local_scopes_, member_->nranks_, member_->use_cuda_, - member_->nccl_ctxs_.get()); - compiled_graphs.push_back(temp_owned_graph.release()); - } + temp_owned_graph = build_strategy.Apply( + std::move(temp_owned_graph), member_->places_, loss_var_name, + member_->local_scopes_, member_->nranks_, member_->use_cuda_, + member_->nccl_ctxs_.get()); #else - auto temp_owned_graph = build_strategy.Apply( - std::move(temp_owned_graphs[0]), member_->places_, loss_var_name, + temp_owned_graph = build_strategy.Apply( + std::move(temp_owned_graph), member_->places_, loss_var_name, member_->local_scopes_, member_->nranks_, member_->use_cuda_); - compiled_graphs.push_back(temp_owned_graph.release()); ->>>>>>> fix parallel graph mode program + #endif auto max_memory_size = GetEagerDeletionThreshold(); VLOG(10) << "Eager Deletion Threshold " << static_cast(max_memory_size) / (1 << 30); if (max_memory_size >= 0) { -<<<<<<< HEAD - graph = member_->PrepareGCAndRefCnts(std::move(graph), - static_cast(max_memory_size)).release(); -======= - for (size_t i = 0; i < graphs.size(); ++i) { - compiled_graphs[i] = - member_ - ->PrepareGCAndRefCnts( - std::unique_ptr(compiled_graphs[i]), - static_cast(max_memory_size)) - .release(); - } ->>>>>>> fix parallel graph mode program + graph = member_ + ->PrepareGCAndRefCnts(std::move(temp_owned_graph), + static_cast(max_memory_size)) + .release(); + } else { + graph = temp_owned_graph.release(); } // Step 3. Create vars in each scope. Passes may also create new vars. // skip control vars and empty vars std::vector var_infos; -<<<<<<< HEAD for (auto &node : graph->Nodes()) { if (node->IsVar() && !node->IsCtrlVar() && node->Var()) { var_infos.emplace_back(); var_infos.back().name_ = node->Var()->Name(); var_infos.back().type_ = node->Var()->GetType(); var_infos.back().persistable_ = node->Var()->Persistable(); -======= - for (auto &graph : compiled_graphs) { - for (auto &node : graph->Nodes()) { - if (node->IsVar() && !node->IsCtrlVar() && node->Var()) { - var_infos.emplace_back(); - var_infos.back().name_ = node->Var()->Name(); - var_infos.back().type_ = node->Var()->GetType(); - var_infos.back().persistable_ = node->Var()->Persistable(); - } ->>>>>>> fix parallel graph mode program } } // If the loss_var_name is given, the number of graph should be only one. if (loss_var_name.size()) { -<<<<<<< HEAD size_t graph_num = ir::GraphNum(*graph); -======= - size_t graph_num = ir::GraphNum(*compiled_graphs[0]); ->>>>>>> fix parallel graph mode program if (graph_num > 1) { LOG(WARNING) << "The number of graph should be only one, " "but the current graph has " -<<<<<<< HEAD << ir::GraphNum(*graph) -======= - << ir::GraphNum(*compiled_graphs[0]) ->>>>>>> fix parallel graph mode program << " sub_graphs. If you want to see the nodes of the " "sub_graphs, you should use 'FLAGS_print_sub_graph_dir' " "to specify the output dir. NOTES: if you not do training, " @@ -379,18 +312,12 @@ ParallelExecutor::ParallelExecutor( } } -<<<<<<< HEAD if (build_strategy.enable_parallel_graph_) { #ifdef PADDLE_WITH_CUDA // TODO(Yancey1989): Remove passing in the main_program when // allreduce_seq_pass doesn't need it as the attr. -======= - if (parallel_graphs) { ->>>>>>> polish member_->executor_.reset(new details::ParallelSSAGraphExecutor( -<<<<<<< HEAD - exec_strategy, member_->local_scopes_, member_->places_, main_program, - graph)); + exec_strategy, member_->local_scopes_, member_->places_, graph)); #else PADDLE_THROW( "Paddle should be compiled with CUDA for ParallelGraph Execution."); @@ -402,19 +329,6 @@ ParallelExecutor::ParallelExecutor( } else { member_->executor_.reset(new details::FastThreadedSSAGraphExecutor( exec_strategy, member_->local_scopes_, member_->places_, graph)); -======= - exec_strategy, member_->local_scopes_, member_->places_, - compiled_graphs)); - } else { - if (exec_strategy.type_ == ExecutionStrategy::kDefault) { - member_->executor_.reset(new details::ThreadedSSAGraphExecutor( - exec_strategy, member_->local_scopes_, member_->places_, - compiled_graphs[0])); - } else { - member_->executor_.reset(new details::FastThreadedSSAGraphExecutor( - exec_strategy, member_->local_scopes_, member_->places_, - compiled_graphs[0])); ->>>>>>> fix parallel graph mode program } } @@ -551,9 +465,9 @@ ParallelExecutor::~ParallelExecutor() { delete member_; } -bool EnableParallelGraphExecution(const ir::Graph &graph, - const ExecutionStrategy &exec_strategy, - const BuildStrategy &build_strategy) { +bool ParallelExecutor::EnableParallelGraphExecution( + const ir::Graph &graph, const ExecutionStrategy &exec_strategy, + const BuildStrategy &build_strategy) const { if (!FLAGS_enable_parallel_graph) return false; bool enable_parallel_graph = true; diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index a6c0d65c016b1e5670bdcb3867bd66b6b0b6cdd4..ddf60b39466e72822142e1dad2cfe9a97b6cf6f2 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -46,11 +46,11 @@ class ParallelExecutor { public: explicit ParallelExecutor(const std::vector &places, const std::unordered_set &bcast_vars, - const std::vector &graphs, const std::string &loss_var_name, Scope *scope, const std::vector &local_scopes, const ExecutionStrategy &exec_strategy, - const BuildStrategy &build_strategy); + const BuildStrategy &build_strategy, + ir::Graph *graph); ~ParallelExecutor(); @@ -71,6 +71,9 @@ class ParallelExecutor { private: void BCastParamsToDevices(const std::unordered_set &vars) const; + bool EnableParallelGraphExecution(const ir::Graph &graph, + const ExecutionStrategy &exec_strategy, + const BuildStrategy &build_strategy) const; ParallelExecutorPrivate *member_; #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) @@ -78,9 +81,5 @@ class ParallelExecutor { #endif }; -bool EnableParallelGraphExecution(const ir::Graph &graph, - const ExecutionStrategy &exec_strategy, - const BuildStrategy &build_strategy); - } // namespace framework } // namespace paddle diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index ccbdb1ab110aff9630e09b888460a6c4b36d875c..fd74dd3d0f9a0bf973644f45a235d9840de914c3 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -976,8 +976,6 @@ All parameter, weight, gradient are variables in Paddle. [](ir::PassBuilder &self, size_t idx) { self.RemovePass(idx); }); // -- python binds for parallel executor. - m.def("_enable_parallel_graph_execution", - framework::EnableParallelGraphExecution); py::class_ pe(m, "ParallelExecutor"); py::class_ exec_strategy(pe, "ExecutionStrategy", R"DOC( @@ -1216,10 +1214,9 @@ All parameter, weight, gradient are variables in Paddle. cannot be updated after being finalized.)DOC"); pe.def(py::init &, - const std::unordered_set &, - const std::vector &, const std::string &, + const std::unordered_set &, const std::string &, Scope *, std::vector &, const ExecutionStrategy &, - const BuildStrategy &>()) + const BuildStrategy &, ir::Graph *>()) // NOTE: even we return a vec* to Python use reference policy. // We still cannot get local_scope from this vector, since the element // of vec will be freed by Python GC. We can only return Scope* diff --git a/python/paddle/fluid/compiler.py b/python/paddle/fluid/compiler.py index acea09e95751dc7531ca1abf39176d275fbc81fa..d7975fe88612305dfb9326eed4b666c486034924 100644 --- a/python/paddle/fluid/compiler.py +++ b/python/paddle/fluid/compiler.py @@ -198,7 +198,6 @@ class CompiledProgram(object): if self._build_strategy.enable_inplace is None: self._build_strategy.enable_inplace = False if self._program and self._program._is_mem_optimized else True - # TODO(wuyi): trainer endpoings should be passed in through # build_strategy, not program.xxx. if self._program and self._build_strategy.num_trainers > 1 and \ @@ -219,26 +218,13 @@ class CompiledProgram(object): places = list(map(_place_obj, self._places)) - # FIXME(Yancey1989): parallel graph mode get better performance - # in GPU allreduce distributed training. Need an elegant way to - # choice the execution strategy. - enable_parallel_graph = \ - core._enable_parallel_graph_execution(self._graph, - self._exec_strategy, - self._build_strategy) and \ - self._program # only supported if compile program not graph. - - self._pe_graphs = [self._graph] - if enable_parallel_graph: - for _ in range(len(places) - 1): - self._pe_graphs.append(core.Graph(self._program_desc)) - - return core.ParallelExecutor( + pe = core.ParallelExecutor( places, - set(self._persistable_vars), self._pe_graphs, + set(self._persistable_vars), cpt.to_text(self._loss_name) if self._loss_name else six.u(''), self._scope, self._local_scopes, - self._exec_strategy, self._build_strategy) + self._exec_strategy, self._build_strategy, self._graph) + return pe def _compile_inference(self): return core.create_paddle_predictor(self._infer_config) diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index 1d513c6eadc641819f0da1d09694a67b6652b708..730b3f517312ced280f2fcfc0db263995e67cfec 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -186,12 +186,12 @@ class ParallelExecutor(object): # step7: init ParallelExecutor # ParallelExecutor API will be deprecated, don't support parallel graph. - self._graphs = [core.Graph(main.desc)] + self._graph = core.Graph(main.desc) self.executor = core.ParallelExecutor( - places, persistable_vars, self._graphs, + places, persistable_vars, cpt.to_text(loss_name) if loss_name else six.u(''), scope, - local_scopes, exec_strategy, build_strategy) + local_scopes, exec_strategy, build_strategy, self._graph) self.scope = scope