From 26e32e095a6c4d643fccf2cea7675b075aad1730 Mon Sep 17 00:00:00 2001 From: Xin Pan Date: Thu, 17 Jan 2019 17:34:01 +0800 Subject: [PATCH] allow compiler to use graph test=develop --- paddle/fluid/API.spec | 2 +- .../fluid/framework/details/build_strategy.cc | 26 +-- .../fluid/framework/details/build_strategy.h | 2 +- .../fast_threaded_ssa_graph_executor.cc | 9 +- .../fast_threaded_ssa_graph_executor.h | 4 +- .../details/memory_optimize_helper_test.cc | 26 +-- .../framework/details/memory_optimize_pass.cc | 3 +- .../details/parallel_ssa_graph_executor.cc | 9 +- .../details/parallel_ssa_graph_executor.h | 4 +- .../details/threaded_ssa_graph_executor.cc | 9 +- .../details/threaded_ssa_graph_executor.h | 4 +- paddle/fluid/framework/ir/graph.h | 16 ++ paddle/fluid/framework/parallel_executor.cc | 154 ++++++++++--- paddle/fluid/framework/parallel_executor.h | 9 +- paddle/fluid/pybind/ir.cc | 3 +- paddle/fluid/pybind/pybind.cc | 10 +- python/paddle/fluid/compiler.py | 83 +++++-- .../slim/unitest/test_quantization_pass.py | 204 ++++++++++++++++++ python/paddle/fluid/executor.py | 1 + python/paddle/fluid/framework.py | 3 +- python/paddle/fluid/parallel_executor.py | 5 +- 21 files changed, 460 insertions(+), 126 deletions(-) create mode 100644 python/paddle/fluid/contrib/slim/unitest/test_quantization_pass.py diff --git a/paddle/fluid/API.spec b/paddle/fluid/API.spec index f24cf96cce3..711c7481d24 100644 --- a/paddle/fluid/API.spec +++ b/paddle/fluid/API.spec @@ -43,7 +43,7 @@ paddle.fluid.AsyncExecutor.init_worker ArgSpec(args=['self', 'dist_desc', 'start paddle.fluid.AsyncExecutor.run ArgSpec(args=['self', 'program', 'data_feed', 'filelist', 'thread_num', 'fetch', 'mode', 'debug'], varargs=None, keywords=None, defaults=('', False)) paddle.fluid.AsyncExecutor.save_model ArgSpec(args=['self', 'save_path'], varargs=None, keywords=None, defaults=None) paddle.fluid.AsyncExecutor.stop ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) -paddle.fluid.CompiledProgram.__init__ ArgSpec(args=['self', 'program'], varargs=None, keywords=None, defaults=None) +paddle.fluid.CompiledProgram.__init__ ArgSpec(args=['self', 'program_or_graph'], varargs=None, keywords=None, defaults=None) paddle.fluid.CompiledProgram.with_data_parallel ArgSpec(args=['self', 'loss_name', 'build_strategy', 'exec_strategy', 'share_vars_from'], varargs=None, keywords=None, defaults=(None, None, None, None)) paddle.fluid.CompiledProgram.with_inference_optimize ArgSpec(args=['self', 'config'], varargs=None, keywords=None, defaults=None) paddle.fluid.ExecutionStrategy.__init__ __init__(self: paddle.fluid.core.ParallelExecutor.ExecutionStrategy) -> None diff --git a/paddle/fluid/framework/details/build_strategy.cc b/paddle/fluid/framework/details/build_strategy.cc index 8c6c9f35e84..231abac9719 100644 --- a/paddle/fluid/framework/details/build_strategy.cc +++ b/paddle/fluid/framework/details/build_strategy.cc @@ -171,7 +171,8 @@ bool BuildStrategy::IsMultiDevPass(const std::string &pass_name) const { } std::unique_ptr BuildStrategy::Apply( - const ProgramDesc &main_program, const std::vector &places, + std::unique_ptr graph, + const std::vector &places, const std::string &loss_var_name, const std::vector &local_scopes, const size_t &nranks, #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) @@ -182,7 +183,7 @@ std::unique_ptr BuildStrategy::Apply( // Create a default one if not finalized by user. CreatePassesFromStrategy(false); - std::unique_ptr graph(new ir::Graph(main_program)); + std::vector all_ops = graph->OriginProgram().Block(0).AllOps(); for (std::shared_ptr &pass : pass_builder_->AllPasses()) { if (IsMultiDevPass(pass->Type())) { pass->Erase(kPlaces); @@ -204,37 +205,30 @@ std::unique_ptr BuildStrategy::Apply( if (graph->Has(kAllOpDescs)) { graph->Erase(kAllOpDescs); } - const std::vector *all_op_descs = - new std::vector(main_program.Block(0).AllOps()); - graph->Set>(kAllOpDescs, - all_op_descs); // take ownership + + graph->SetNotOwned>(kAllOpDescs, + &all_ops); // take ownership pass->Erase(kAllOpDescs); - pass->SetNotOwned>(kAllOpDescs, all_op_descs); + pass->SetNotOwned>(kAllOpDescs, &all_ops); } else if (pass->Type() == "sequential_execution_pass") { LOG(INFO) << "set enable_sequential_execution:" << enable_sequential_execution_; pass->Erase(kAllOpDescs); - pass->Set>( - kAllOpDescs, - new std::vector(main_program.Block(0).AllOps())); + pass->SetNotOwned>(kAllOpDescs, &all_ops); } else if (pass->Type() == "all_reduce_deps_pass") { LOG(INFO) << "SeqOnlyAllReduceOps:" << SeqOnlyAllReduceOps(*this) << ", num_trainers:" << num_trainers_; pass->Erase(kAllOpDescs); - pass->Set>( - kAllOpDescs, - new std::vector(main_program.Block(0).AllOps())); + pass->SetNotOwned>(kAllOpDescs, &all_ops); } else if (pass->Type() == "inplace_pass") { if (graph->Has(kAllOpDescs)) { graph->Erase(kAllOpDescs); } - graph->Set>( - kAllOpDescs, - new std::vector(main_program.Block(0).AllOps())); + graph->SetNotOwned>(kAllOpDescs, &all_ops); } else if (pass->Type() == "fuse_relu_depthwise_conv_pass") { if (!use_cuda) { LOG(WARNING) << "fuse_relu_depthwise_conv_pass is only supported on " diff --git a/paddle/fluid/framework/details/build_strategy.h b/paddle/fluid/framework/details/build_strategy.h index e62e3edcef7..0ea71aa3b75 100644 --- a/paddle/fluid/framework/details/build_strategy.h +++ b/paddle/fluid/framework/details/build_strategy.h @@ -114,7 +114,7 @@ struct BuildStrategy { // Apply the passes built by the pass_builder_. The passes will be // applied to the Program and output an ir::Graph. - std::unique_ptr Apply(const ProgramDesc &main_program, + std::unique_ptr Apply(std::unique_ptr graph, const std::vector &places, const std::string &loss_var_name, const std::vector &local_scopes, diff --git a/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc index 872bc5d654c..f0364670581 100644 --- a/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc @@ -24,12 +24,11 @@ namespace details { FastThreadedSSAGraphExecutor::FastThreadedSSAGraphExecutor( const ExecutionStrategy &strategy, const std::vector &local_scopes, - const std::vector &places, - std::unique_ptr &&graph) + const std::vector &places, ir::Graph *graph) : strategy_(strategy), local_scopes_(local_scopes), places_(places), - graph_(std::move(graph)), + graph_(graph), pool_(strategy.num_threads_), prepare_pool_(1), // add one more thread for generate op_deps fetch_ctxs_(places) { @@ -110,14 +109,14 @@ FeedFetchList FastThreadedSSAGraphExecutor::Run( } } if (exception_.IsCaught()) { - ClearFetchOp(graph_.get(), &fetch_ops); + ClearFetchOp(graph_, &fetch_ops); exception_.ReThrow(); } } num_complete += num_comp; } // Wait FetchOps. - ClearFetchOp(graph_.get(), &fetch_ops); + ClearFetchOp(graph_, &fetch_ops); return fetches; } diff --git a/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h b/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h index c3a8b854234..970298950cc 100644 --- a/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h @@ -32,7 +32,7 @@ class FastThreadedSSAGraphExecutor : public SSAGraphExecutor { FastThreadedSSAGraphExecutor(const ExecutionStrategy &strategy, const std::vector &local_scopes, const std::vector &places, - std::unique_ptr &&graph); + ir::Graph *graph); FeedFetchList Run(const std::vector &fetch_tensors) override; const ir::Graph &Graph() const override; @@ -40,7 +40,7 @@ class FastThreadedSSAGraphExecutor : public SSAGraphExecutor { ExecutionStrategy strategy_; std::vector local_scopes_; std::vector places_; - std::unique_ptr graph_; + ir::Graph *graph_; std::unordered_map op_deps_; std::vector bootstrap_ops_; diff --git a/paddle/fluid/framework/details/memory_optimize_helper_test.cc b/paddle/fluid/framework/details/memory_optimize_helper_test.cc index 3cfe297a73c..5389e76e0c6 100644 --- a/paddle/fluid/framework/details/memory_optimize_helper_test.cc +++ b/paddle/fluid/framework/details/memory_optimize_helper_test.cc @@ -228,9 +228,6 @@ TEST(CFGGraph, IRGraph) { // prepare ir graph auto prog = FillProgramDesc(); ir::Graph graph(prog); - const std::vector* all_op_descs = - new std::vector(prog.Block(0).AllOps()); - graph.Set(details::kAllOpDescs, all_op_descs); // take ownership ControlFlowGraph cfg(graph); cfg.LiveVariableAnalysis(); @@ -256,9 +253,6 @@ TEST(CFGGraph, IRGraph) { TEST(SortOpLikeDescOrder, NormalTest) { auto prog = FillProgramDesc(); ir::Graph graph(prog); - const std::vector* all_op_descs = - new std::vector(prog.Block(0).AllOps()); - graph.Set(details::kAllOpDescs, all_op_descs); // take ownership auto nodes = SortOpLikeDescOrder(graph); auto op_descs = prog.Block(0).AllOps(); @@ -273,9 +267,6 @@ TEST(SortOpLikeDescOrder, NormalTest) { TEST(SortOpLikeDescOrder, RemoveOpDesc) { auto prog = FillProgramDesc(); ir::Graph graph(prog); - const std::vector* all_op_descs = - new std::vector(prog.Block(0).AllOps()); - graph.Set(details::kAllOpDescs, all_op_descs); // take ownership auto nodes = graph.Nodes(); auto op_descs = prog.Block(0).AllOps(); ir::Node* found_node = nullptr; @@ -324,8 +315,6 @@ TEST(SortOpLikeDescOrder, RemoveOpDesc) { // 3. add some op_desc TEST(SortOpLikeDescOrder, AddOpDesc) { auto prog = FillProgramDesc(); - const std::vector* all_op_descs = - new std::vector(prog.Block(0).AllOps()); ir::Graph graph(prog); auto find_node_in_graph = [&](std::string s) { @@ -342,9 +331,7 @@ TEST(SortOpLikeDescOrder, AddOpDesc) { // cached desc different with real one // mimic the intermidiete pass modify the programdesc. - graph.Set(details::kAllOpDescs, all_op_descs); // take ownership - - auto op_descs = prog.Block(0).AllOps(); + std::vector op_descs = graph.OriginProgram().Block(0).AllOps(); auto op = prog.MutableBlock(0)->AppendOp(); prog.MutableBlock(0)->Var("d1")->SetType(proto::VarType::LOD_TENSOR); @@ -376,9 +363,6 @@ TEST(SortOpLikeDescOrder, AddOpDesc) { TEST(SortOpLikeDescOrder, AddAndDeleteOpDesc) { auto prog = FillProgramDesc(); ir::Graph graph(prog); - const std::vector* all_op_descs = - new std::vector(prog.Block(0).AllOps()); - graph.Set(details::kAllOpDescs, all_op_descs); // take ownership auto find_node_in_graph = [&](std::string s) { ir::Node* ret = nullptr; @@ -392,8 +376,9 @@ TEST(SortOpLikeDescOrder, AddAndDeleteOpDesc) { return ret; }; + std::vector op_descs = graph.OriginProgram().Block(0).AllOps(); + // remove sum node - auto op_descs = prog.Block(0).AllOps(); ir::Node* found_node = nullptr; auto nodes = graph.Nodes(); for (auto node : nodes) { @@ -454,9 +439,7 @@ TEST(SortOpLikeDescOrder, AddAndDeleteOpDesc) { TEST(SortOpLikeDescOrder, AddAndReplaceOpDescInplace) { auto prog = FillProgramDesc(); ir::Graph graph(prog); - const std::vector* all_op_descs = - new std::vector(prog.Block(0).AllOps()); - graph.Set(details::kAllOpDescs, all_op_descs); // take ownership + std::vector op_descs = graph.OriginProgram().Block(0).AllOps(); auto find_node_in_graph = [&](std::string s) { ir::Node* ret = nullptr; @@ -470,7 +453,6 @@ TEST(SortOpLikeDescOrder, AddAndReplaceOpDescInplace) { return ret; }; - auto op_descs = prog.Block(0).AllOps(); // add node auto op = prog.MutableBlock(0)->AppendOp(); prog.MutableBlock(0)->Var("d1")->SetType(proto::VarType::LOD_TENSOR); diff --git a/paddle/fluid/framework/details/memory_optimize_pass.cc b/paddle/fluid/framework/details/memory_optimize_pass.cc index fd02bc4697e..20d4865887c 100644 --- a/paddle/fluid/framework/details/memory_optimize_pass.cc +++ b/paddle/fluid/framework/details/memory_optimize_pass.cc @@ -336,5 +336,4 @@ void MemoryOptimizePass::RenameVarInGraphNode(const std::string& var, } // namespace paddle REGISTER_PASS(memory_optimize_pass, - paddle::framework::details::MemoryOptimizePass) - .RequireGraphAttr(paddle::framework::details::kAllOpDescs); + paddle::framework::details::MemoryOptimizePass); diff --git a/paddle/fluid/framework/details/parallel_ssa_graph_executor.cc b/paddle/fluid/framework/details/parallel_ssa_graph_executor.cc index 4c8f69c68ce..18b455cc6c3 100644 --- a/paddle/fluid/framework/details/parallel_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/parallel_ssa_graph_executor.cc @@ -20,8 +20,7 @@ namespace framework { namespace details { std::vector> -ParallelSSAGraphExecutor::SeparateMultiDevicesGraph( - std::unique_ptr &&graph) { +ParallelSSAGraphExecutor::SeparateMultiDevicesGraph(ir::Graph* graph) { std::vector> graphs; graphs.reserve(places_.size()); for (size_t i = 0; i < places_.size(); ++i) { @@ -78,7 +77,7 @@ ParallelSSAGraphExecutor::SeparateMultiDevicesGraph( ParallelSSAGraphExecutor::ParallelSSAGraphExecutor( const ExecutionStrategy &strategy, const std::vector &local_scopes, const std::vector &places, - const framework::ProgramDesc &main_prog, std::unique_ptr &&graph) + const framework::ProgramDesc &main_prog, ir::Graph* graph) : strategy_(std::move(strategy)), local_scopes_(std::move(local_scopes)), pool_(places.size() >= 2 ? new ::ThreadPool(places.size()) : nullptr), @@ -86,7 +85,7 @@ ParallelSSAGraphExecutor::ParallelSSAGraphExecutor( main_prog_(main_prog), // TODO(Yancey1989): Copying graphs is not safely since it deleted the // attrs. - graphs_(SeparateMultiDevicesGraph(std::move(graph))) { + graphs_(SeparateMultiDevicesGraph(graph)) { PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size()); auto seq_allreduce_pass = @@ -107,7 +106,7 @@ ParallelSSAGraphExecutor::ParallelSSAGraphExecutor( << " to run the operators of the graph on each device."; for (size_t i = 0; i < places.size(); ++i) { executors_.emplace_back(new details::ThreadedSSAGraphExecutor( - strategy_, local_scopes_, {places_[i]}, std::move(graphs_.at(i)))); + strategy_, local_scopes_, {places_[i]}, graphs_.at(i).get())); } } diff --git a/paddle/fluid/framework/details/parallel_ssa_graph_executor.h b/paddle/fluid/framework/details/parallel_ssa_graph_executor.h index 1c35d45fdd3..a1547878a58 100644 --- a/paddle/fluid/framework/details/parallel_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/parallel_ssa_graph_executor.h @@ -32,7 +32,7 @@ class ParallelSSAGraphExecutor : public SSAGraphExecutor { const std::vector &local_scopes, const std::vector &places, const framework::ProgramDesc &main_prog, - std::unique_ptr &&graph); + ir::Graph* graph); ~ParallelSSAGraphExecutor() final = default; const ir::Graph &Graph() const override { return *graphs_[0]; } @@ -41,7 +41,7 @@ class ParallelSSAGraphExecutor : public SSAGraphExecutor { private: std::vector> SeparateMultiDevicesGraph( - std::unique_ptr &&graph); + ir::Graph* graph); ExecutionStrategy strategy_; std::vector local_scopes_; diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc index 72acc337b7c..9ba295a2b06 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc @@ -23,9 +23,8 @@ namespace framework { namespace details { ThreadedSSAGraphExecutor::ThreadedSSAGraphExecutor( const ExecutionStrategy &strategy, const std::vector &local_scopes, - const std::vector &places, - std::unique_ptr &&graph) - : graph_(std::move(graph)), + const std::vector &places, ir::Graph *graph) + : graph_(graph), pool_(strategy.num_threads_ >= 2 ? new ::ThreadPool(strategy.num_threads_) : nullptr), local_scopes_(local_scopes), @@ -110,7 +109,7 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( for (auto &run_op_future : run_op_futures_) { run_op_future.wait(); } - ClearFetchOp(graph_.get(), &fetch_ops); + ClearFetchOp(graph_, &fetch_ops); exception_holder_.ReThrow(); } else { continue; @@ -135,7 +134,7 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( } PADDLE_ENFORCE(ready_ops.empty()); // Wait FetchOps. - ClearFetchOp(graph_.get(), &fetch_ops); + ClearFetchOp(graph_, &fetch_ops); return fetch_data; } diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h index 24da56c09e3..0867f621048 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h @@ -41,7 +41,7 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { ThreadedSSAGraphExecutor(const ExecutionStrategy &strategy, const std::vector &local_scopes, const std::vector &places, - std::unique_ptr &&graph); + ir::Graph *graph); const ir::Graph &Graph() const override { return *graph_; } // Run a SSAGraph by a thread pool @@ -55,7 +55,7 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { details::OpHandleBase *op); private: - std::unique_ptr graph_; + ir::Graph *graph_; std::unique_ptr<::ThreadPool> pool_; std::vector local_scopes_; std::vector places_; diff --git a/paddle/fluid/framework/ir/graph.h b/paddle/fluid/framework/ir/graph.h index 296f3b83961..6b8115b295f 100644 --- a/paddle/fluid/framework/ir/graph.h +++ b/paddle/fluid/framework/ir/graph.h @@ -195,6 +195,22 @@ class Graph { return nullptr; } +<<<<<<< HEAD +======= + // Returns reference to the original program. + // WARN: After a series of passes, the current graph can be quite + // different from OriginProgram. Caller shouldn't assume much from + // the returned OriginProgram. + const ProgramDesc &OriginProgram() const { return program_; } + + void ResolveHazard( + const std::map> &var_nodes); + + private: + std::map> InitFromProgram( + const ProgramDesc &program); + +>>>>>>> polish // This method takes ownership of `node`. ir::Node *AddNode(ir::Node *node) { PADDLE_ENFORCE(node_set_.find(node) == node_set_.end()); diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 56da5660095..2e68a2dd0fa 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -184,7 +184,7 @@ std::vector &ParallelExecutor::GetLocalScopes() { ParallelExecutor::ParallelExecutor( const std::vector &places, const std::unordered_set &bcast_vars, - const ProgramDesc &main_program, const std::string &loss_var_name, + const std::vector &graphs, const std::string &loss_var_name, Scope *scope, const std::vector &local_scopes, const ExecutionStrategy &exec_strategy, const BuildStrategy &build_strategy) : member_(new ParallelExecutorPrivate(places)) { @@ -216,15 +216,34 @@ ParallelExecutor::ParallelExecutor( } } +<<<<<<< HEAD + std::unique_ptr temp_owned_graph(graph); + // FIXME(Yancey1989): parallel graph mode get better performance // in GPU allreduce distributed training. Need an elegant way to // choice the execution strategy. build_strategy.enable_parallel_graph_ = - EnableParallelGraphExecution(main_program, exec_strategy, build_strategy); + EnableParallelGraphExecution(*temp_owned_graph, exec_strategy, build_strategy); if (build_strategy.enable_parallel_graph_) VLOG(0) << "The Executor would execute the graph by ParallelGraph " "Execution which can get better performance," << "you can force it off by env FLAGS_enable_parallel_graph=0"; +======= + // TODO(panyx0718): Update pass interface so we don't need this here. + std::vector> temp_owned_graphs; + for (ir::Graph *g : graphs) { + temp_owned_graphs.emplace_back(g); + } +<<<<<<< HEAD +>>>>>>> fix parallel graph mode program + +======= + bool parallel_graphs = (temp_owned_graphs.size() > 1); + if (parallel_graphs) { + PADDLE_ENFORCE_EQ(temp_owned_graphs.size(), places.size()); + } + VLOG(1) << "Enable ParallelGraph Execution: " << parallel_graphs; +>>>>>>> polish if (member_->use_cuda_) { // Bcast Parameters to all GPUs @@ -236,7 +255,7 @@ ParallelExecutor::ParallelExecutor( if (nccl_id_var != nullptr) { nccl_id = nccl_id_var->GetMutable(); } - if (build_strategy.enable_parallel_graph_ && member_->nranks_ > 1UL) { + if (parallel_graphs && member_->nranks_ > 1UL) { if (nccl_id == nullptr) { local_nccl_id_.reset(new ncclUniqueId()); platform::dynload::ncclGetUniqueId(local_nccl_id_.get()); @@ -258,44 +277,101 @@ ParallelExecutor::ParallelExecutor( // Step 2. Convert main_program to SSA form and dependency graph. Also, insert // ncclOp +<<<<<<< HEAD std::unique_ptr graph; #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) - graph = build_strategy.Apply(main_program, member_->places_, loss_var_name, + + temp_owned_graph = build_strategy.Apply(std::move(temp_owned_graph), member_->places_, loss_var_name, member_->local_scopes_, member_->nranks_, member_->use_cuda_, member_->nccl_ctxs_.get()); #else - graph = build_strategy.Apply(main_program, member_->places_, loss_var_name, + temp_owned_graph = build_strategy.Apply(std::move(temp_owned_graph), member_->places_, loss_var_name, member_->local_scopes_, member_->nranks_, member_->use_cuda_); + +======= + std::vector compiled_graphs; +#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) + if (parallel_graphs) { + for (size_t i = 0; i < member_->places_.size(); ++i) { + auto temp_owned_graph = build_strategy.Apply( + std::move(temp_owned_graphs[i]), {member_->places_[i]}, loss_var_name, + {member_->local_scopes_[i]}, member_->nranks_, member_->use_cuda_, + member_->nccl_ctxs_.get()); + compiled_graphs.push_back(temp_owned_graph.release()); + } + } else { + auto temp_owned_graph = build_strategy.Apply( + std::move(temp_owned_graphs[0]), member_->places_, loss_var_name, + member_->local_scopes_, member_->nranks_, member_->use_cuda_, + member_->nccl_ctxs_.get()); + compiled_graphs.push_back(temp_owned_graph.release()); + } +#else + auto temp_owned_graph = build_strategy.Apply( + std::move(temp_owned_graphs[0]), member_->places_, loss_var_name, + member_->local_scopes_, member_->nranks_, member_->use_cuda_); + compiled_graphs.push_back(temp_owned_graph.release()); +>>>>>>> fix parallel graph mode program #endif auto max_memory_size = GetEagerDeletionThreshold(); VLOG(10) << "Eager Deletion Threshold " << static_cast(max_memory_size) / (1 << 30); if (max_memory_size >= 0) { +<<<<<<< HEAD graph = member_->PrepareGCAndRefCnts(std::move(graph), - static_cast(max_memory_size)); + static_cast(max_memory_size)).release(); +======= + for (size_t i = 0; i < graphs.size(); ++i) { + compiled_graphs[i] = + member_ + ->PrepareGCAndRefCnts( + std::unique_ptr(compiled_graphs[i]), + static_cast(max_memory_size)) + .release(); + } +>>>>>>> fix parallel graph mode program } // Step 3. Create vars in each scope. Passes may also create new vars. // skip control vars and empty vars std::vector var_infos; +<<<<<<< HEAD for (auto &node : graph->Nodes()) { if (node->IsVar() && !node->IsCtrlVar() && node->Var()) { var_infos.emplace_back(); var_infos.back().name_ = node->Var()->Name(); var_infos.back().type_ = node->Var()->GetType(); var_infos.back().persistable_ = node->Var()->Persistable(); +======= + for (auto &graph : compiled_graphs) { + for (auto &node : graph->Nodes()) { + if (node->IsVar() && !node->IsCtrlVar() && node->Var()) { + var_infos.emplace_back(); + var_infos.back().name_ = node->Var()->Name(); + var_infos.back().type_ = node->Var()->GetType(); + var_infos.back().persistable_ = node->Var()->Persistable(); + } +>>>>>>> fix parallel graph mode program } } // If the loss_var_name is given, the number of graph should be only one. if (loss_var_name.size()) { +<<<<<<< HEAD size_t graph_num = ir::GraphNum(*graph); +======= + size_t graph_num = ir::GraphNum(*compiled_graphs[0]); +>>>>>>> fix parallel graph mode program if (graph_num > 1) { LOG(WARNING) << "The number of graph should be only one, " "but the current graph has " +<<<<<<< HEAD << ir::GraphNum(*graph) +======= + << ir::GraphNum(*compiled_graphs[0]) +>>>>>>> fix parallel graph mode program << " sub_graphs. If you want to see the nodes of the " "sub_graphs, you should use 'FLAGS_print_sub_graph_dir' " "to specify the output dir. NOTES: if you not do training, " @@ -303,26 +379,42 @@ ParallelExecutor::ParallelExecutor( } } +<<<<<<< HEAD if (build_strategy.enable_parallel_graph_) { #ifdef PADDLE_WITH_CUDA // TODO(Yancey1989): Remove passing in the main_program when // allreduce_seq_pass doesn't need it as the attr. +======= + if (parallel_graphs) { +>>>>>>> polish member_->executor_.reset(new details::ParallelSSAGraphExecutor( +<<<<<<< HEAD exec_strategy, member_->local_scopes_, member_->places_, main_program, - std::move(graph))); + graph)); #else PADDLE_THROW( "Paddle should be compiled with CUDA for ParallelGraph Execution."); #endif + } else { + if (exec_strategy.type_ == ExecutionStrategy::kDefault) { + member_->executor_.reset(new details::ThreadedSSAGraphExecutor( + exec_strategy, member_->local_scopes_, member_->places_, graph)); + } else { + member_->executor_.reset(new details::FastThreadedSSAGraphExecutor( + exec_strategy, member_->local_scopes_, member_->places_, graph)); +======= + exec_strategy, member_->local_scopes_, member_->places_, + compiled_graphs)); } else { if (exec_strategy.type_ == ExecutionStrategy::kDefault) { member_->executor_.reset(new details::ThreadedSSAGraphExecutor( exec_strategy, member_->local_scopes_, member_->places_, - std::move(graph))); + compiled_graphs[0])); } else { member_->executor_.reset(new details::FastThreadedSSAGraphExecutor( exec_strategy, member_->local_scopes_, member_->places_, - std::move(graph))); + compiled_graphs[0])); +>>>>>>> fix parallel graph mode program } } @@ -452,24 +544,33 @@ void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes( } } -bool ParallelExecutor::EnableParallelGraphExecution( - const ProgramDesc &main_program, const ExecutionStrategy &exec_strategy, - const BuildStrategy &build_strategy) const { +ParallelExecutor::~ParallelExecutor() { + for (auto &p : member_->places_) { + platform::DeviceContextPool::Instance().Get(p)->Wait(); + } + delete member_; +} + +bool EnableParallelGraphExecution(const ir::Graph &graph, + const ExecutionStrategy &exec_strategy, + const BuildStrategy &build_strategy) { if (!FLAGS_enable_parallel_graph) return false; bool enable_parallel_graph = true; - // TODO(Yancey1989): support sparse update in ParallelGraph mode. - for (auto &var_desc : main_program.Block(0).AllVars()) { - if (var_desc->GetType() == proto::VarType::SELECTED_ROWS) { - enable_parallel_graph = false; - } - } - // TODO(Yancey1989): support pserver mode - for (auto &op_desc : main_program.Block(0).AllOps()) { - if (op_desc->Type() == "send" || op_desc->Type() == "recv") { - enable_parallel_graph = false; - break; + for (ir::Node *node : graph.Nodes()) { + if (node->IsVar() && node->Var()) { + // TODO(Yancey1989): support sparse update in ParallelGraph mode. + if (node->Var()->GetType() == proto::VarType::SELECTED_ROWS) { + enable_parallel_graph = false; + break; + } + } else if (node->IsOp() && node->Op()) { + // TODO(Yancey1989): support pserver mode + if (node->Op()->Type() == "send" || node->Op()->Type() == "recv") { + enable_parallel_graph = false; + break; + } } } @@ -481,13 +582,6 @@ bool ParallelExecutor::EnableParallelGraphExecution( return enable_parallel_graph; } -ParallelExecutor::~ParallelExecutor() { - for (auto &p : member_->places_) { - platform::DeviceContextPool::Instance().Get(p)->Wait(); - } - delete member_; -} - } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index 121bbd55ad5..a6c0d65c016 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -46,7 +46,7 @@ class ParallelExecutor { public: explicit ParallelExecutor(const std::vector &places, const std::unordered_set &bcast_vars, - const ProgramDesc &main_program, + const std::vector &graphs, const std::string &loss_var_name, Scope *scope, const std::vector &local_scopes, const ExecutionStrategy &exec_strategy, @@ -71,9 +71,6 @@ class ParallelExecutor { private: void BCastParamsToDevices(const std::unordered_set &vars) const; - bool EnableParallelGraphExecution(const ProgramDesc &main_program, - const ExecutionStrategy &exec_strategy, - const BuildStrategy &build_strategy) const; ParallelExecutorPrivate *member_; #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) @@ -81,5 +78,9 @@ class ParallelExecutor { #endif }; +bool EnableParallelGraphExecution(const ir::Graph &graph, + const ExecutionStrategy &exec_strategy, + const BuildStrategy &build_strategy); + } // namespace framework } // namespace paddle diff --git a/paddle/fluid/pybind/ir.cc b/paddle/fluid/pybind/ir.cc index 1cd1be8e8d9..069750e2406 100644 --- a/paddle/fluid/pybind/ir.cc +++ b/paddle/fluid/pybind/ir.cc @@ -101,7 +101,8 @@ void BindGraph(py::module *m) { [](Graph &self, Node &node) { return self.RemoveNode(&node); }) .def("retrieve_node", &Graph::RetrieveNode, return_value_policy::reference) - .def("resolve_hazard", &Graph::ResolveHazard); + .def("resolve_hazard", &Graph::ResolveHazard) + .def("origin_program_desc", &Graph::OriginProgram); } void BindNode(py::module *m) { diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index d8e57a1ac6c..ccbdb1ab110 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -976,6 +976,9 @@ All parameter, weight, gradient are variables in Paddle. [](ir::PassBuilder &self, size_t idx) { self.RemovePass(idx); }); // -- python binds for parallel executor. + m.def("_enable_parallel_graph_execution", + framework::EnableParallelGraphExecution); + py::class_ pe(m, "ParallelExecutor"); py::class_ exec_strategy(pe, "ExecutionStrategy", R"DOC( ExecutionStrategy allows the user to more preciously control how to run @@ -1213,9 +1216,10 @@ All parameter, weight, gradient are variables in Paddle. cannot be updated after being finalized.)DOC"); pe.def(py::init &, - const std::unordered_set &, const ProgramDesc &, - const std::string &, Scope *, std::vector &, - const ExecutionStrategy &, const BuildStrategy &>()) + const std::unordered_set &, + const std::vector &, const std::string &, + Scope *, std::vector &, const ExecutionStrategy &, + const BuildStrategy &>()) // NOTE: even we return a vec* to Python use reference policy. // We still cannot get local_scope from this vector, since the element // of vec will be freed by Python GC. We can only return Scope* diff --git a/python/paddle/fluid/compiler.py b/python/paddle/fluid/compiler.py index fa79db19ee8..acea09e9575 100644 --- a/python/paddle/fluid/compiler.py +++ b/python/paddle/fluid/compiler.py @@ -17,6 +17,7 @@ import os import six import sys from .. import compat as cpt +from . import framework from . import core @@ -36,7 +37,7 @@ def _place_obj(place): class CompiledProgram(object): """ - Compiles a Program for execution. + Compiles to Graph for execution. 1. Users first create the program with layers. 2. Optionally, users use CompiledProgram to optimize the program before run. @@ -51,7 +52,7 @@ class CompiledProgram(object): Example: .. code-block:: python - place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + place = fluid.CUDAPlace(0) if use_gpu else fluid.CPUPlace() exe = fluid.Executor(place) exe.run(startup) compiled_prog = compiler.CompiledProgram(main).with_data_parallel( @@ -62,11 +63,25 @@ class CompiledProgram(object): fetch_list=[loss.name]) Args: - program: Program instance that contains the model logic. + program_or_graph (Graph|Program): If it's Program, it will be first + lowered to a graph for further optimizations. If it's a graph + (potentially optimized before), it will be directly used for + further optimizations. Note: graph is only supported when compiled + with with_data_parallel option. """ - def __init__(self, program): - self._program = program + def __init__(self, program_or_graph): + if isinstance(program_or_graph, core.Graph): + self._graph = program_or_graph + self._program = None + elif isinstance(program_or_graph, framework.Program): + self._graph = core.Graph(program_or_graph.desc) + self._program = program_or_graph + else: + raise ValueError("Wrong program_to_graph type: %s" % + type(program_or_graph)) + + self._program_desc = self._graph.origin_program_desc() self._scope = None self._place = None self._executor = None @@ -101,6 +116,7 @@ class CompiledProgram(object): self """ assert not self._is_data_parallel, "Already compiled with parallel." + assert not self._is_inference, "Cannot compile both data parallel and inference" self._is_data_parallel = True self._build_strategy = build_strategy self._exec_strategy = exec_strategy @@ -120,11 +136,13 @@ class CompiledProgram(object): Returns: self """ + assert not self._is_data_parallel, "Cannot compile both data parallel and inference." + assert not self._is_inference, "Already compiled with inference" + assert any([ isinstance(config, InferNativeConfig), isinstance(config, InferAnalysisConfig) ]) - self._is_data_parallel = False self._is_inference = True self._infer_config = config return self @@ -173,37 +191,56 @@ class CompiledProgram(object): os.environ.get('CPU_NUM', multiprocessing.cpu_count())) self._exec_strategy.num_threads = cpu_num * 2 - trainers_endpoints = self._program._trainers_endpoints - # FIXME(dzhwinter): enable_inplace should be after memory_optimize # if turn on python memory optimize, turn off the inplace_pass. if self._build_strategy.memory_optimize is None: - self._build_strategy.memory_optimize = False if self._program._is_mem_optimized else True + self._build_strategy.memory_optimize = False if self._program and self._program._is_mem_optimized else True if self._build_strategy.enable_inplace is None: - self._build_strategy.enable_inplace = False if self._program._is_mem_optimized else True + self._build_strategy.enable_inplace = False if self._program and self._program._is_mem_optimized else True + + + # TODO(wuyi): trainer endpoings should be passed in through + # build_strategy, not program.xxx. + if self._program and self._build_strategy.num_trainers > 1 and \ + self._program._trainers_endpoints: + tps = self._program._trainers_endpoints - if self._build_strategy.num_trainers > 1 and trainers_endpoints: assert self._build_strategy.num_trainers == len( - trainers_endpoints), "num_trainers == len(end_points)" - self._build_strategy.trainers_endpoints = trainers_endpoints - - self._persistable_vars = set([ - cpt.to_text(v.name) - for v in [ - var for var in self._program.list_vars() - if var.persistable and var.type != core.VarDesc.VarType.RAW - ] - ]) + tps), "num_trainers == len(end_points)" + self._build_strategy.trainers_endpoints = tps + + self._persistable_vars = [] + for block_id in range(self._program_desc.num_blocks()): + bdesc = self._program_desc.block(block_id) + self._persistable_vars.extend([ + cpt.to_text(v.name()) for v in bdesc.all_vars() + if v.persistable() and v.type() != core.VarDesc.VarType.RAW + ]) places = list(map(_place_obj, self._places)) + + # FIXME(Yancey1989): parallel graph mode get better performance + # in GPU allreduce distributed training. Need an elegant way to + # choice the execution strategy. + enable_parallel_graph = \ + core._enable_parallel_graph_execution(self._graph, + self._exec_strategy, + self._build_strategy) and \ + self._program # only supported if compile program not graph. + + self._pe_graphs = [self._graph] + if enable_parallel_graph: + for _ in range(len(places) - 1): + self._pe_graphs.append(core.Graph(self._program_desc)) + return core.ParallelExecutor( - places, self._persistable_vars, self._program.desc, + places, + set(self._persistable_vars), self._pe_graphs, cpt.to_text(self._loss_name) if self._loss_name else six.u(''), self._scope, self._local_scopes, self._exec_strategy, self._build_strategy) def _compile_inference(self): - assert self._is_data_parallel is False return core.create_paddle_predictor(self._infer_config) def _compile(self, scope, place): diff --git a/python/paddle/fluid/contrib/slim/unitest/test_quantization_pass.py b/python/paddle/fluid/contrib/slim/unitest/test_quantization_pass.py new file mode 100644 index 00000000000..4f3fee09459 --- /dev/null +++ b/python/paddle/fluid/contrib/slim/unitest/test_quantization_pass.py @@ -0,0 +1,204 @@ +# copyright (c) 2018 paddlepaddle authors. all rights reserved. +# +# licensed under the apache license, version 2.0 (the "license"); +# you may not use this file except in compliance with the license. +# you may obtain a copy of the license at +# +# http://www.apache.org/licenses/license-2.0 +# +# unless required by applicable law or agreed to in writing, software +# distributed under the license is distributed on an "as is" basis, +# without warranties or conditions of any kind, either express or implied. +# see the license for the specific language governing permissions and +# limitations under the license. + +import unittest +import random +import numpy as np +import paddle.fluid as fluid +import six +from paddle.fluid.framework import Program +from paddle.fluid.framework import IrGraph +from paddle.fluid.contrib.slim.quantization import QuantizationTransformPass +from paddle.fluid import core + + +def linear_fc(num): + data = fluid.layers.data(name='image', shape=[1, 28, 28], dtype='float32') + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + hidden = data + for _ in six.moves.xrange(num): + hidden = fluid.layers.fc(hidden, size=128, act='relu') + fc = fluid.layers.fc(input=hidden, size=10) + loss = fluid.layers.softmax_with_cross_entropy(fc, label=label) + loss = fluid.layers.mean(loss) + return loss + + +def residual_block(num): + def conv_bn_layer(input, + ch_out, + filter_size, + stride, + padding, + act='relu', + bias_attr=False): + tmp = fluid.layers.conv2d( + input=input, + filter_size=filter_size, + num_filters=ch_out, + stride=stride, + padding=padding, + act=None, + bias_attr=bias_attr) + return fluid.layers.batch_norm(input=tmp, act=act) + + data = fluid.layers.data(name='image', shape=[1, 28, 28], dtype='float32') + label = fluid.layers.data(name='label', shape=[1], dtype='int64') + hidden = data + for _ in six.moves.xrange(num): + conv = conv_bn_layer(hidden, 16, 3, 1, 1, act=None, bias_attr=True) + short = conv_bn_layer(hidden, 16, 1, 1, 0, act=None) + hidden = fluid.layers.elementwise_add(x=conv, y=short, act='relu') + fc = fluid.layers.fc(input=hidden, size=10) + loss = fluid.layers.softmax_with_cross_entropy(fc, label) + loss = fluid.layers.mean(loss) + return loss + + +class TestQuantizationTransformPass(unittest.TestCase): + def setUp(self): + self.quantizable_op_and_inputs = { + 'conv2d': ['Input', 'Filter'], + 'depthwise_conv2d': ['Input', 'Filter'], + 'mul': ['X', 'Y'] + } + self.quantizable_grad_op_inputs = { + 'conv2d_grad': ['Input', 'Filter'], + 'depthwise_conv2d_grad': ['Input', 'Filter'], + 'mul_grad': ['X', 'Y'] + } + + def check_program(self, transform_pass, program): + quantized_ops = set() + for block in program.blocks: + for op in block.ops: + # check forward + if op.type in self.quantizable_op_and_inputs: + for arg_name in op.input_arg_names: + self.assertTrue( + arg_name.endswith('.quantized.dequantized')) + quantized_ops.add(arg_name) + + for op in block.ops: + # check backward + if op.type in self.quantizable_grad_op_inputs: + for pname in self.quantizable_grad_op_inputs[op.type]: + arg_name = op.input(pname)[0] + self.assertTrue( + arg_name.endswith('.quantized.dequantized')) + self.assertTrue(arg_name in quantized_ops) + + def linear_fc_quant(self, quant_type): + main = fluid.Program() + startup = fluid.Program() + with fluid.program_guard(main, startup): + loss = linear_fc(3) + opt = fluid.optimizer.Adam(learning_rate=0.001) + opt.minimize(loss) + exe = fluid.Executor(fluid.CPUPlace()) + graph = IrGraph(core.Graph(main.desc), for_test=False) + transform_pass = QuantizationTransformPass( + scope=fluid.global_scope(), + program_exe=exe, + activation_quantize_type=quant_type) + transform_pass.apply(graph) + marked_nodes = set() + for op in graph.all_ops(): + if op.name().find('quantize') > -1: + marked_nodes.add(op) + graph.draw('.', 'quantize_fc_' + quant_type, marked_nodes) + program = graph.to_program() + self.check_program(transform_pass, program) + val_graph = IrGraph(core.Graph(program.desc), for_test=False) + val_marked_nodes = set() + for op in val_graph.all_ops(): + if op.name().find('quantize') > -1: + val_marked_nodes.add(op) + val_graph.draw('.', 'val_fc_' + quant_type, val_marked_nodes) + + def test_linear_fc_quant_abs_max(self): + self.act_quant_op_type = 'fake_quantize_abs_max' + self.linear_fc_quant('abs_max') + + def test_linear_fc_quant_range_abs_max(self): + self.act_quant_op_type = 'fake_quantize_range_abs_max' + self.linear_fc_quant('range_abs_max') + + def residual_block_quant(self, quant_type): + main = fluid.Program() + startup = fluid.Program() + with fluid.program_guard(main, startup): + loss = residual_block(2) + opt = fluid.optimizer.Adam(learning_rate=0.001) + opt.minimize(loss) + exe = fluid.Executor(fluid.CPUPlace()) + graph = IrGraph(core.Graph(main.desc), for_test=False) + transform_pass = QuantizationTransformPass( + scope=fluid.global_scope(), + program_exe=exe, + activation_quantize_type=quant_type) + transform_pass.apply(graph) + marked_nodes = set() + for op in graph.all_ops(): + if op.name().find('quantize') > -1: + marked_nodes.add(op) + graph.draw('.', 'quantize_residual_' + quant_type, marked_nodes) + program = graph.to_program() + self.check_program(transform_pass, program) + val_graph = IrGraph(core.Graph(program.desc), for_test=False) + val_marked_nodes = set() + for op in val_graph.all_ops(): + if op.name().find('quantize') > -1: + val_marked_nodes.add(op) + val_graph.draw('.', 'val_residual_' + quant_type, val_marked_nodes) + + def test_residual_block_abs_max(self): + self.act_quant_op_type = 'fake_quantize_abs_max' + self.residual_block_quant('abs_max') + + def test_residual_block_range_abs_max(self): + self.act_quant_op_type = 'fake_quantize_range_abs_max' + self.residual_block_quant('range_abs_max') + + def test_execute_graph(self): + main = fluid.Program() + startup = fluid.Program() + with fluid.program_guard(main, startup): + loss = linear_fc(3) + opt = fluid.optimizer.Adam(learning_rate=0.0001) + opt.minimize(loss) + + exe = fluid.Executor(fluid.CPUPlace()) + graph = IrGraph(core.Graph(main.desc), for_test=False) + exe.run(startup) + binary = fluid.CompiledProgram(graph.graph).with_data_parallel( + loss_name=loss.name) + for i in range(10): + loss_val = exe.run(binary, + feed={ + 'image': np.ones( + [32, 784], dtype=np.float32), + 'label': np.ones( + [32, 1], dtype=np.int64) + }, + fetch_list=[loss]) + if i == 0: + start_loss = np.sum(loss_val) + elif i == 9: + end_loss = np.sum(loss_val) + self.assertLess(end_loss, start_loss) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index 8815911eaeb..d0cdb73841c 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -538,6 +538,7 @@ class Executor(object): else: # TODO(panyx0718): Can compile program to optimize executor # performance. + assert program._program, "CompiledProgram is compiled from graph, can only run with_data_parallel." return self._run( program._program, self._default_executor, diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index 15367c724e5..72f1eae9542 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -2322,7 +2322,7 @@ class Program(object): @staticmethod def _construct_from_desc(desc): """ - Construct a program from program desc. + Construct a program from program desc. (Experiment) Args: desc(core.ProgramDesc): The program desc for constructing. @@ -2332,6 +2332,7 @@ class Program(object): """ p = Program() p.desc = desc + # TODO(wangzhen): Block.vars/ops are not filled, should fix it. p.blocks = [Block(p, i) for i in six.moves.range(p.desc.num_blocks())] p._sync_with_cpp() return p diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index 8586670c248..1d513c6eadc 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -185,8 +185,11 @@ class ParallelExecutor(object): places = list(map(place_obj, self._places)) # step7: init ParallelExecutor + # ParallelExecutor API will be deprecated, don't support parallel graph. + self._graphs = [core.Graph(main.desc)] + self.executor = core.ParallelExecutor( - places, persistable_vars, main.desc, + places, persistable_vars, self._graphs, cpt.to_text(loss_name) if loss_name else six.u(''), scope, local_scopes, exec_strategy, build_strategy) -- GitLab