提交 26e32e09 编写于 作者: X Xin Pan

allow compiler to use graph

test=develop
上级 a7e7d952
...@@ -43,7 +43,7 @@ paddle.fluid.AsyncExecutor.init_worker ArgSpec(args=['self', 'dist_desc', 'start ...@@ -43,7 +43,7 @@ paddle.fluid.AsyncExecutor.init_worker ArgSpec(args=['self', 'dist_desc', 'start
paddle.fluid.AsyncExecutor.run ArgSpec(args=['self', 'program', 'data_feed', 'filelist', 'thread_num', 'fetch', 'mode', 'debug'], varargs=None, keywords=None, defaults=('', False)) paddle.fluid.AsyncExecutor.run ArgSpec(args=['self', 'program', 'data_feed', 'filelist', 'thread_num', 'fetch', 'mode', 'debug'], varargs=None, keywords=None, defaults=('', False))
paddle.fluid.AsyncExecutor.save_model ArgSpec(args=['self', 'save_path'], varargs=None, keywords=None, defaults=None) paddle.fluid.AsyncExecutor.save_model ArgSpec(args=['self', 'save_path'], varargs=None, keywords=None, defaults=None)
paddle.fluid.AsyncExecutor.stop ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) paddle.fluid.AsyncExecutor.stop ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None)
paddle.fluid.CompiledProgram.__init__ ArgSpec(args=['self', 'program'], varargs=None, keywords=None, defaults=None) paddle.fluid.CompiledProgram.__init__ ArgSpec(args=['self', 'program_or_graph'], varargs=None, keywords=None, defaults=None)
paddle.fluid.CompiledProgram.with_data_parallel ArgSpec(args=['self', 'loss_name', 'build_strategy', 'exec_strategy', 'share_vars_from'], varargs=None, keywords=None, defaults=(None, None, None, None)) paddle.fluid.CompiledProgram.with_data_parallel ArgSpec(args=['self', 'loss_name', 'build_strategy', 'exec_strategy', 'share_vars_from'], varargs=None, keywords=None, defaults=(None, None, None, None))
paddle.fluid.CompiledProgram.with_inference_optimize ArgSpec(args=['self', 'config'], varargs=None, keywords=None, defaults=None) paddle.fluid.CompiledProgram.with_inference_optimize ArgSpec(args=['self', 'config'], varargs=None, keywords=None, defaults=None)
paddle.fluid.ExecutionStrategy.__init__ __init__(self: paddle.fluid.core.ParallelExecutor.ExecutionStrategy) -> None paddle.fluid.ExecutionStrategy.__init__ __init__(self: paddle.fluid.core.ParallelExecutor.ExecutionStrategy) -> None
......
...@@ -171,7 +171,8 @@ bool BuildStrategy::IsMultiDevPass(const std::string &pass_name) const { ...@@ -171,7 +171,8 @@ bool BuildStrategy::IsMultiDevPass(const std::string &pass_name) const {
} }
std::unique_ptr<ir::Graph> BuildStrategy::Apply( std::unique_ptr<ir::Graph> BuildStrategy::Apply(
const ProgramDesc &main_program, const std::vector<platform::Place> &places, std::unique_ptr<ir::Graph> graph,
const std::vector<platform::Place> &places,
const std::string &loss_var_name, const std::vector<Scope *> &local_scopes, const std::string &loss_var_name, const std::vector<Scope *> &local_scopes,
const size_t &nranks, const size_t &nranks,
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
...@@ -182,7 +183,7 @@ std::unique_ptr<ir::Graph> BuildStrategy::Apply( ...@@ -182,7 +183,7 @@ std::unique_ptr<ir::Graph> BuildStrategy::Apply(
// Create a default one if not finalized by user. // Create a default one if not finalized by user.
CreatePassesFromStrategy(false); CreatePassesFromStrategy(false);
std::unique_ptr<ir::Graph> graph(new ir::Graph(main_program)); std::vector<OpDesc *> all_ops = graph->OriginProgram().Block(0).AllOps();
for (std::shared_ptr<ir::Pass> &pass : pass_builder_->AllPasses()) { for (std::shared_ptr<ir::Pass> &pass : pass_builder_->AllPasses()) {
if (IsMultiDevPass(pass->Type())) { if (IsMultiDevPass(pass->Type())) {
pass->Erase(kPlaces); pass->Erase(kPlaces);
...@@ -204,37 +205,30 @@ std::unique_ptr<ir::Graph> BuildStrategy::Apply( ...@@ -204,37 +205,30 @@ std::unique_ptr<ir::Graph> BuildStrategy::Apply(
if (graph->Has(kAllOpDescs)) { if (graph->Has(kAllOpDescs)) {
graph->Erase(kAllOpDescs); graph->Erase(kAllOpDescs);
} }
const std::vector<OpDesc *> *all_op_descs =
new std::vector<OpDesc *>(main_program.Block(0).AllOps()); graph->SetNotOwned<const std::vector<OpDesc *>>(kAllOpDescs,
graph->Set<const std::vector<OpDesc *>>(kAllOpDescs, &all_ops); // take ownership
all_op_descs); // take ownership
pass->Erase(kAllOpDescs); pass->Erase(kAllOpDescs);
pass->SetNotOwned<const std::vector<OpDesc *>>(kAllOpDescs, all_op_descs); pass->SetNotOwned<const std::vector<OpDesc *>>(kAllOpDescs, &all_ops);
} else if (pass->Type() == "sequential_execution_pass") { } else if (pass->Type() == "sequential_execution_pass") {
LOG(INFO) << "set enable_sequential_execution:" LOG(INFO) << "set enable_sequential_execution:"
<< enable_sequential_execution_; << enable_sequential_execution_;
pass->Erase(kAllOpDescs); pass->Erase(kAllOpDescs);
pass->Set<const std::vector<OpDesc *>>( pass->SetNotOwned<const std::vector<OpDesc *>>(kAllOpDescs, &all_ops);
kAllOpDescs,
new std::vector<OpDesc *>(main_program.Block(0).AllOps()));
} else if (pass->Type() == "all_reduce_deps_pass") { } else if (pass->Type() == "all_reduce_deps_pass") {
LOG(INFO) << "SeqOnlyAllReduceOps:" << SeqOnlyAllReduceOps(*this) LOG(INFO) << "SeqOnlyAllReduceOps:" << SeqOnlyAllReduceOps(*this)
<< ", num_trainers:" << num_trainers_; << ", num_trainers:" << num_trainers_;
pass->Erase(kAllOpDescs); pass->Erase(kAllOpDescs);
pass->Set<const std::vector<OpDesc *>>( pass->SetNotOwned<const std::vector<OpDesc *>>(kAllOpDescs, &all_ops);
kAllOpDescs,
new std::vector<OpDesc *>(main_program.Block(0).AllOps()));
} else if (pass->Type() == "inplace_pass") { } else if (pass->Type() == "inplace_pass") {
if (graph->Has(kAllOpDescs)) { if (graph->Has(kAllOpDescs)) {
graph->Erase(kAllOpDescs); graph->Erase(kAllOpDescs);
} }
graph->Set<const std::vector<OpDesc *>>( graph->SetNotOwned<const std::vector<OpDesc *>>(kAllOpDescs, &all_ops);
kAllOpDescs,
new std::vector<OpDesc *>(main_program.Block(0).AllOps()));
} else if (pass->Type() == "fuse_relu_depthwise_conv_pass") { } else if (pass->Type() == "fuse_relu_depthwise_conv_pass") {
if (!use_cuda) { if (!use_cuda) {
LOG(WARNING) << "fuse_relu_depthwise_conv_pass is only supported on " LOG(WARNING) << "fuse_relu_depthwise_conv_pass is only supported on "
......
...@@ -114,7 +114,7 @@ struct BuildStrategy { ...@@ -114,7 +114,7 @@ struct BuildStrategy {
// Apply the passes built by the pass_builder_. The passes will be // Apply the passes built by the pass_builder_. The passes will be
// applied to the Program and output an ir::Graph. // applied to the Program and output an ir::Graph.
std::unique_ptr<ir::Graph> Apply(const ProgramDesc &main_program, std::unique_ptr<ir::Graph> Apply(std::unique_ptr<ir::Graph> graph,
const std::vector<platform::Place> &places, const std::vector<platform::Place> &places,
const std::string &loss_var_name, const std::string &loss_var_name,
const std::vector<Scope *> &local_scopes, const std::vector<Scope *> &local_scopes,
......
...@@ -24,12 +24,11 @@ namespace details { ...@@ -24,12 +24,11 @@ namespace details {
FastThreadedSSAGraphExecutor::FastThreadedSSAGraphExecutor( FastThreadedSSAGraphExecutor::FastThreadedSSAGraphExecutor(
const ExecutionStrategy &strategy, const std::vector<Scope *> &local_scopes, const ExecutionStrategy &strategy, const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places, const std::vector<platform::Place> &places, ir::Graph *graph)
std::unique_ptr<ir::Graph> &&graph)
: strategy_(strategy), : strategy_(strategy),
local_scopes_(local_scopes), local_scopes_(local_scopes),
places_(places), places_(places),
graph_(std::move(graph)), graph_(graph),
pool_(strategy.num_threads_), pool_(strategy.num_threads_),
prepare_pool_(1), // add one more thread for generate op_deps prepare_pool_(1), // add one more thread for generate op_deps
fetch_ctxs_(places) { fetch_ctxs_(places) {
...@@ -110,14 +109,14 @@ FeedFetchList FastThreadedSSAGraphExecutor::Run( ...@@ -110,14 +109,14 @@ FeedFetchList FastThreadedSSAGraphExecutor::Run(
} }
} }
if (exception_.IsCaught()) { if (exception_.IsCaught()) {
ClearFetchOp(graph_.get(), &fetch_ops); ClearFetchOp(graph_, &fetch_ops);
exception_.ReThrow(); exception_.ReThrow();
} }
} }
num_complete += num_comp; num_complete += num_comp;
} }
// Wait FetchOps. // Wait FetchOps.
ClearFetchOp(graph_.get(), &fetch_ops); ClearFetchOp(graph_, &fetch_ops);
return fetches; return fetches;
} }
......
...@@ -32,7 +32,7 @@ class FastThreadedSSAGraphExecutor : public SSAGraphExecutor { ...@@ -32,7 +32,7 @@ class FastThreadedSSAGraphExecutor : public SSAGraphExecutor {
FastThreadedSSAGraphExecutor(const ExecutionStrategy &strategy, FastThreadedSSAGraphExecutor(const ExecutionStrategy &strategy,
const std::vector<Scope *> &local_scopes, const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places, const std::vector<platform::Place> &places,
std::unique_ptr<ir::Graph> &&graph); ir::Graph *graph);
FeedFetchList Run(const std::vector<std::string> &fetch_tensors) override; FeedFetchList Run(const std::vector<std::string> &fetch_tensors) override;
const ir::Graph &Graph() const override; const ir::Graph &Graph() const override;
...@@ -40,7 +40,7 @@ class FastThreadedSSAGraphExecutor : public SSAGraphExecutor { ...@@ -40,7 +40,7 @@ class FastThreadedSSAGraphExecutor : public SSAGraphExecutor {
ExecutionStrategy strategy_; ExecutionStrategy strategy_;
std::vector<Scope *> local_scopes_; std::vector<Scope *> local_scopes_;
std::vector<platform::Place> places_; std::vector<platform::Place> places_;
std::unique_ptr<ir::Graph> graph_; ir::Graph *graph_;
std::unordered_map<OpHandleBase *, int> op_deps_; std::unordered_map<OpHandleBase *, int> op_deps_;
std::vector<OpHandleBase *> bootstrap_ops_; std::vector<OpHandleBase *> bootstrap_ops_;
......
...@@ -228,9 +228,6 @@ TEST(CFGGraph, IRGraph) { ...@@ -228,9 +228,6 @@ TEST(CFGGraph, IRGraph) {
// prepare ir graph // prepare ir graph
auto prog = FillProgramDesc(); auto prog = FillProgramDesc();
ir::Graph graph(prog); ir::Graph graph(prog);
const std::vector<OpDesc*>* all_op_descs =
new std::vector<OpDesc*>(prog.Block(0).AllOps());
graph.Set(details::kAllOpDescs, all_op_descs); // take ownership
ControlFlowGraph cfg(graph); ControlFlowGraph cfg(graph);
cfg.LiveVariableAnalysis(); cfg.LiveVariableAnalysis();
...@@ -256,9 +253,6 @@ TEST(CFGGraph, IRGraph) { ...@@ -256,9 +253,6 @@ TEST(CFGGraph, IRGraph) {
TEST(SortOpLikeDescOrder, NormalTest) { TEST(SortOpLikeDescOrder, NormalTest) {
auto prog = FillProgramDesc(); auto prog = FillProgramDesc();
ir::Graph graph(prog); ir::Graph graph(prog);
const std::vector<OpDesc*>* all_op_descs =
new std::vector<OpDesc*>(prog.Block(0).AllOps());
graph.Set(details::kAllOpDescs, all_op_descs); // take ownership
auto nodes = SortOpLikeDescOrder(graph); auto nodes = SortOpLikeDescOrder(graph);
auto op_descs = prog.Block(0).AllOps(); auto op_descs = prog.Block(0).AllOps();
...@@ -273,9 +267,6 @@ TEST(SortOpLikeDescOrder, NormalTest) { ...@@ -273,9 +267,6 @@ TEST(SortOpLikeDescOrder, NormalTest) {
TEST(SortOpLikeDescOrder, RemoveOpDesc) { TEST(SortOpLikeDescOrder, RemoveOpDesc) {
auto prog = FillProgramDesc(); auto prog = FillProgramDesc();
ir::Graph graph(prog); ir::Graph graph(prog);
const std::vector<OpDesc*>* all_op_descs =
new std::vector<OpDesc*>(prog.Block(0).AllOps());
graph.Set(details::kAllOpDescs, all_op_descs); // take ownership
auto nodes = graph.Nodes(); auto nodes = graph.Nodes();
auto op_descs = prog.Block(0).AllOps(); auto op_descs = prog.Block(0).AllOps();
ir::Node* found_node = nullptr; ir::Node* found_node = nullptr;
...@@ -324,8 +315,6 @@ TEST(SortOpLikeDescOrder, RemoveOpDesc) { ...@@ -324,8 +315,6 @@ TEST(SortOpLikeDescOrder, RemoveOpDesc) {
// 3. add some op_desc // 3. add some op_desc
TEST(SortOpLikeDescOrder, AddOpDesc) { TEST(SortOpLikeDescOrder, AddOpDesc) {
auto prog = FillProgramDesc(); auto prog = FillProgramDesc();
const std::vector<OpDesc*>* all_op_descs =
new std::vector<OpDesc*>(prog.Block(0).AllOps());
ir::Graph graph(prog); ir::Graph graph(prog);
auto find_node_in_graph = [&](std::string s) { auto find_node_in_graph = [&](std::string s) {
...@@ -342,9 +331,7 @@ TEST(SortOpLikeDescOrder, AddOpDesc) { ...@@ -342,9 +331,7 @@ TEST(SortOpLikeDescOrder, AddOpDesc) {
// cached desc different with real one // cached desc different with real one
// mimic the intermidiete pass modify the programdesc. // mimic the intermidiete pass modify the programdesc.
graph.Set(details::kAllOpDescs, all_op_descs); // take ownership std::vector<OpDesc*> op_descs = graph.OriginProgram().Block(0).AllOps();
auto op_descs = prog.Block(0).AllOps();
auto op = prog.MutableBlock(0)->AppendOp(); auto op = prog.MutableBlock(0)->AppendOp();
prog.MutableBlock(0)->Var("d1")->SetType(proto::VarType::LOD_TENSOR); prog.MutableBlock(0)->Var("d1")->SetType(proto::VarType::LOD_TENSOR);
...@@ -376,9 +363,6 @@ TEST(SortOpLikeDescOrder, AddOpDesc) { ...@@ -376,9 +363,6 @@ TEST(SortOpLikeDescOrder, AddOpDesc) {
TEST(SortOpLikeDescOrder, AddAndDeleteOpDesc) { TEST(SortOpLikeDescOrder, AddAndDeleteOpDesc) {
auto prog = FillProgramDesc(); auto prog = FillProgramDesc();
ir::Graph graph(prog); ir::Graph graph(prog);
const std::vector<OpDesc*>* all_op_descs =
new std::vector<OpDesc*>(prog.Block(0).AllOps());
graph.Set(details::kAllOpDescs, all_op_descs); // take ownership
auto find_node_in_graph = [&](std::string s) { auto find_node_in_graph = [&](std::string s) {
ir::Node* ret = nullptr; ir::Node* ret = nullptr;
...@@ -392,8 +376,9 @@ TEST(SortOpLikeDescOrder, AddAndDeleteOpDesc) { ...@@ -392,8 +376,9 @@ TEST(SortOpLikeDescOrder, AddAndDeleteOpDesc) {
return ret; return ret;
}; };
std::vector<OpDesc*> op_descs = graph.OriginProgram().Block(0).AllOps();
// remove sum node // remove sum node
auto op_descs = prog.Block(0).AllOps();
ir::Node* found_node = nullptr; ir::Node* found_node = nullptr;
auto nodes = graph.Nodes(); auto nodes = graph.Nodes();
for (auto node : nodes) { for (auto node : nodes) {
...@@ -454,9 +439,7 @@ TEST(SortOpLikeDescOrder, AddAndDeleteOpDesc) { ...@@ -454,9 +439,7 @@ TEST(SortOpLikeDescOrder, AddAndDeleteOpDesc) {
TEST(SortOpLikeDescOrder, AddAndReplaceOpDescInplace) { TEST(SortOpLikeDescOrder, AddAndReplaceOpDescInplace) {
auto prog = FillProgramDesc(); auto prog = FillProgramDesc();
ir::Graph graph(prog); ir::Graph graph(prog);
const std::vector<OpDesc*>* all_op_descs = std::vector<OpDesc*> op_descs = graph.OriginProgram().Block(0).AllOps();
new std::vector<OpDesc*>(prog.Block(0).AllOps());
graph.Set(details::kAllOpDescs, all_op_descs); // take ownership
auto find_node_in_graph = [&](std::string s) { auto find_node_in_graph = [&](std::string s) {
ir::Node* ret = nullptr; ir::Node* ret = nullptr;
...@@ -470,7 +453,6 @@ TEST(SortOpLikeDescOrder, AddAndReplaceOpDescInplace) { ...@@ -470,7 +453,6 @@ TEST(SortOpLikeDescOrder, AddAndReplaceOpDescInplace) {
return ret; return ret;
}; };
auto op_descs = prog.Block(0).AllOps();
// add node // add node
auto op = prog.MutableBlock(0)->AppendOp(); auto op = prog.MutableBlock(0)->AppendOp();
prog.MutableBlock(0)->Var("d1")->SetType(proto::VarType::LOD_TENSOR); prog.MutableBlock(0)->Var("d1")->SetType(proto::VarType::LOD_TENSOR);
......
...@@ -336,5 +336,4 @@ void MemoryOptimizePass::RenameVarInGraphNode(const std::string& var, ...@@ -336,5 +336,4 @@ void MemoryOptimizePass::RenameVarInGraphNode(const std::string& var,
} // namespace paddle } // namespace paddle
REGISTER_PASS(memory_optimize_pass, REGISTER_PASS(memory_optimize_pass,
paddle::framework::details::MemoryOptimizePass) paddle::framework::details::MemoryOptimizePass);
.RequireGraphAttr(paddle::framework::details::kAllOpDescs);
...@@ -20,8 +20,7 @@ namespace framework { ...@@ -20,8 +20,7 @@ namespace framework {
namespace details { namespace details {
std::vector<std::unique_ptr<ir::Graph>> std::vector<std::unique_ptr<ir::Graph>>
ParallelSSAGraphExecutor::SeparateMultiDevicesGraph( ParallelSSAGraphExecutor::SeparateMultiDevicesGraph(ir::Graph* graph) {
std::unique_ptr<ir::Graph> &&graph) {
std::vector<std::unique_ptr<ir::Graph>> graphs; std::vector<std::unique_ptr<ir::Graph>> graphs;
graphs.reserve(places_.size()); graphs.reserve(places_.size());
for (size_t i = 0; i < places_.size(); ++i) { for (size_t i = 0; i < places_.size(); ++i) {
...@@ -78,7 +77,7 @@ ParallelSSAGraphExecutor::SeparateMultiDevicesGraph( ...@@ -78,7 +77,7 @@ ParallelSSAGraphExecutor::SeparateMultiDevicesGraph(
ParallelSSAGraphExecutor::ParallelSSAGraphExecutor( ParallelSSAGraphExecutor::ParallelSSAGraphExecutor(
const ExecutionStrategy &strategy, const std::vector<Scope *> &local_scopes, const ExecutionStrategy &strategy, const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places, const std::vector<platform::Place> &places,
const framework::ProgramDesc &main_prog, std::unique_ptr<ir::Graph> &&graph) const framework::ProgramDesc &main_prog, ir::Graph* graph)
: strategy_(std::move(strategy)), : strategy_(std::move(strategy)),
local_scopes_(std::move(local_scopes)), local_scopes_(std::move(local_scopes)),
pool_(places.size() >= 2 ? new ::ThreadPool(places.size()) : nullptr), pool_(places.size() >= 2 ? new ::ThreadPool(places.size()) : nullptr),
...@@ -86,7 +85,7 @@ ParallelSSAGraphExecutor::ParallelSSAGraphExecutor( ...@@ -86,7 +85,7 @@ ParallelSSAGraphExecutor::ParallelSSAGraphExecutor(
main_prog_(main_prog), main_prog_(main_prog),
// TODO(Yancey1989): Copying graphs is not safely since it deleted the // TODO(Yancey1989): Copying graphs is not safely since it deleted the
// attrs. // attrs.
graphs_(SeparateMultiDevicesGraph(std::move(graph))) { graphs_(SeparateMultiDevicesGraph(graph)) {
PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size()); PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size());
auto seq_allreduce_pass = auto seq_allreduce_pass =
...@@ -107,7 +106,7 @@ ParallelSSAGraphExecutor::ParallelSSAGraphExecutor( ...@@ -107,7 +106,7 @@ ParallelSSAGraphExecutor::ParallelSSAGraphExecutor(
<< " to run the operators of the graph on each device."; << " to run the operators of the graph on each device.";
for (size_t i = 0; i < places.size(); ++i) { for (size_t i = 0; i < places.size(); ++i) {
executors_.emplace_back(new details::ThreadedSSAGraphExecutor( executors_.emplace_back(new details::ThreadedSSAGraphExecutor(
strategy_, local_scopes_, {places_[i]}, std::move(graphs_.at(i)))); strategy_, local_scopes_, {places_[i]}, graphs_.at(i).get()));
} }
} }
......
...@@ -32,7 +32,7 @@ class ParallelSSAGraphExecutor : public SSAGraphExecutor { ...@@ -32,7 +32,7 @@ class ParallelSSAGraphExecutor : public SSAGraphExecutor {
const std::vector<Scope *> &local_scopes, const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places, const std::vector<platform::Place> &places,
const framework::ProgramDesc &main_prog, const framework::ProgramDesc &main_prog,
std::unique_ptr<ir::Graph> &&graph); ir::Graph* graph);
~ParallelSSAGraphExecutor() final = default; ~ParallelSSAGraphExecutor() final = default;
const ir::Graph &Graph() const override { return *graphs_[0]; } const ir::Graph &Graph() const override { return *graphs_[0]; }
...@@ -41,7 +41,7 @@ class ParallelSSAGraphExecutor : public SSAGraphExecutor { ...@@ -41,7 +41,7 @@ class ParallelSSAGraphExecutor : public SSAGraphExecutor {
private: private:
std::vector<std::unique_ptr<ir::Graph>> SeparateMultiDevicesGraph( std::vector<std::unique_ptr<ir::Graph>> SeparateMultiDevicesGraph(
std::unique_ptr<ir::Graph> &&graph); ir::Graph* graph);
ExecutionStrategy strategy_; ExecutionStrategy strategy_;
std::vector<Scope *> local_scopes_; std::vector<Scope *> local_scopes_;
......
...@@ -23,9 +23,8 @@ namespace framework { ...@@ -23,9 +23,8 @@ namespace framework {
namespace details { namespace details {
ThreadedSSAGraphExecutor::ThreadedSSAGraphExecutor( ThreadedSSAGraphExecutor::ThreadedSSAGraphExecutor(
const ExecutionStrategy &strategy, const std::vector<Scope *> &local_scopes, const ExecutionStrategy &strategy, const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places, const std::vector<platform::Place> &places, ir::Graph *graph)
std::unique_ptr<ir::Graph> &&graph) : graph_(graph),
: graph_(std::move(graph)),
pool_(strategy.num_threads_ >= 2 ? new ::ThreadPool(strategy.num_threads_) pool_(strategy.num_threads_ >= 2 ? new ::ThreadPool(strategy.num_threads_)
: nullptr), : nullptr),
local_scopes_(local_scopes), local_scopes_(local_scopes),
...@@ -110,7 +109,7 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( ...@@ -110,7 +109,7 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
for (auto &run_op_future : run_op_futures_) { for (auto &run_op_future : run_op_futures_) {
run_op_future.wait(); run_op_future.wait();
} }
ClearFetchOp(graph_.get(), &fetch_ops); ClearFetchOp(graph_, &fetch_ops);
exception_holder_.ReThrow(); exception_holder_.ReThrow();
} else { } else {
continue; continue;
...@@ -135,7 +134,7 @@ FeedFetchList ThreadedSSAGraphExecutor::Run( ...@@ -135,7 +134,7 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
} }
PADDLE_ENFORCE(ready_ops.empty()); PADDLE_ENFORCE(ready_ops.empty());
// Wait FetchOps. // Wait FetchOps.
ClearFetchOp(graph_.get(), &fetch_ops); ClearFetchOp(graph_, &fetch_ops);
return fetch_data; return fetch_data;
} }
......
...@@ -41,7 +41,7 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { ...@@ -41,7 +41,7 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor {
ThreadedSSAGraphExecutor(const ExecutionStrategy &strategy, ThreadedSSAGraphExecutor(const ExecutionStrategy &strategy,
const std::vector<Scope *> &local_scopes, const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places, const std::vector<platform::Place> &places,
std::unique_ptr<ir::Graph> &&graph); ir::Graph *graph);
const ir::Graph &Graph() const override { return *graph_; } const ir::Graph &Graph() const override { return *graph_; }
// Run a SSAGraph by a thread pool // Run a SSAGraph by a thread pool
...@@ -55,7 +55,7 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { ...@@ -55,7 +55,7 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor {
details::OpHandleBase *op); details::OpHandleBase *op);
private: private:
std::unique_ptr<ir::Graph> graph_; ir::Graph *graph_;
std::unique_ptr<::ThreadPool> pool_; std::unique_ptr<::ThreadPool> pool_;
std::vector<Scope *> local_scopes_; std::vector<Scope *> local_scopes_;
std::vector<platform::Place> places_; std::vector<platform::Place> places_;
......
...@@ -195,6 +195,22 @@ class Graph { ...@@ -195,6 +195,22 @@ class Graph {
return nullptr; return nullptr;
} }
<<<<<<< HEAD
=======
// Returns reference to the original program.
// WARN: After a series of passes, the current graph can be quite
// different from OriginProgram. Caller shouldn't assume much from
// the returned OriginProgram.
const ProgramDesc &OriginProgram() const { return program_; }
void ResolveHazard(
const std::map<std::string, std::vector<ir::Node *>> &var_nodes);
private:
std::map<std::string, std::vector<ir::Node *>> InitFromProgram(
const ProgramDesc &program);
>>>>>>> polish
// This method takes ownership of `node`. // This method takes ownership of `node`.
ir::Node *AddNode(ir::Node *node) { ir::Node *AddNode(ir::Node *node) {
PADDLE_ENFORCE(node_set_.find(node) == node_set_.end()); PADDLE_ENFORCE(node_set_.find(node) == node_set_.end());
......
...@@ -184,7 +184,7 @@ std::vector<Scope *> &ParallelExecutor::GetLocalScopes() { ...@@ -184,7 +184,7 @@ std::vector<Scope *> &ParallelExecutor::GetLocalScopes() {
ParallelExecutor::ParallelExecutor( ParallelExecutor::ParallelExecutor(
const std::vector<platform::Place> &places, const std::vector<platform::Place> &places,
const std::unordered_set<std::string> &bcast_vars, const std::unordered_set<std::string> &bcast_vars,
const ProgramDesc &main_program, const std::string &loss_var_name, const std::vector<ir::Graph *> &graphs, const std::string &loss_var_name,
Scope *scope, const std::vector<Scope *> &local_scopes, Scope *scope, const std::vector<Scope *> &local_scopes,
const ExecutionStrategy &exec_strategy, const BuildStrategy &build_strategy) const ExecutionStrategy &exec_strategy, const BuildStrategy &build_strategy)
: member_(new ParallelExecutorPrivate(places)) { : member_(new ParallelExecutorPrivate(places)) {
...@@ -216,15 +216,34 @@ ParallelExecutor::ParallelExecutor( ...@@ -216,15 +216,34 @@ ParallelExecutor::ParallelExecutor(
} }
} }
<<<<<<< HEAD
std::unique_ptr<ir::Graph> temp_owned_graph(graph);
// FIXME(Yancey1989): parallel graph mode get better performance // FIXME(Yancey1989): parallel graph mode get better performance
// in GPU allreduce distributed training. Need an elegant way to // in GPU allreduce distributed training. Need an elegant way to
// choice the execution strategy. // choice the execution strategy.
build_strategy.enable_parallel_graph_ = build_strategy.enable_parallel_graph_ =
EnableParallelGraphExecution(main_program, exec_strategy, build_strategy); EnableParallelGraphExecution(*temp_owned_graph, exec_strategy, build_strategy);
if (build_strategy.enable_parallel_graph_) if (build_strategy.enable_parallel_graph_)
VLOG(0) << "The Executor would execute the graph by ParallelGraph " VLOG(0) << "The Executor would execute the graph by ParallelGraph "
"Execution which can get better performance," "Execution which can get better performance,"
<< "you can force it off by env FLAGS_enable_parallel_graph=0"; << "you can force it off by env FLAGS_enable_parallel_graph=0";
=======
// TODO(panyx0718): Update pass interface so we don't need this here.
std::vector<std::unique_ptr<ir::Graph>> temp_owned_graphs;
for (ir::Graph *g : graphs) {
temp_owned_graphs.emplace_back(g);
}
<<<<<<< HEAD
>>>>>>> fix parallel graph mode program
=======
bool parallel_graphs = (temp_owned_graphs.size() > 1);
if (parallel_graphs) {
PADDLE_ENFORCE_EQ(temp_owned_graphs.size(), places.size());
}
VLOG(1) << "Enable ParallelGraph Execution: " << parallel_graphs;
>>>>>>> polish
if (member_->use_cuda_) { if (member_->use_cuda_) {
// Bcast Parameters to all GPUs // Bcast Parameters to all GPUs
...@@ -236,7 +255,7 @@ ParallelExecutor::ParallelExecutor( ...@@ -236,7 +255,7 @@ ParallelExecutor::ParallelExecutor(
if (nccl_id_var != nullptr) { if (nccl_id_var != nullptr) {
nccl_id = nccl_id_var->GetMutable<ncclUniqueId>(); nccl_id = nccl_id_var->GetMutable<ncclUniqueId>();
} }
if (build_strategy.enable_parallel_graph_ && member_->nranks_ > 1UL) { if (parallel_graphs && member_->nranks_ > 1UL) {
if (nccl_id == nullptr) { if (nccl_id == nullptr) {
local_nccl_id_.reset(new ncclUniqueId()); local_nccl_id_.reset(new ncclUniqueId());
platform::dynload::ncclGetUniqueId(local_nccl_id_.get()); platform::dynload::ncclGetUniqueId(local_nccl_id_.get());
...@@ -258,44 +277,101 @@ ParallelExecutor::ParallelExecutor( ...@@ -258,44 +277,101 @@ ParallelExecutor::ParallelExecutor(
// Step 2. Convert main_program to SSA form and dependency graph. Also, insert // Step 2. Convert main_program to SSA form and dependency graph. Also, insert
// ncclOp // ncclOp
<<<<<<< HEAD
std::unique_ptr<ir::Graph> graph; std::unique_ptr<ir::Graph> graph;
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
graph = build_strategy.Apply(main_program, member_->places_, loss_var_name,
temp_owned_graph = build_strategy.Apply(std::move(temp_owned_graph), member_->places_, loss_var_name,
member_->local_scopes_, member_->nranks_, member_->local_scopes_, member_->nranks_,
member_->use_cuda_, member_->nccl_ctxs_.get()); member_->use_cuda_, member_->nccl_ctxs_.get());
#else #else
graph = build_strategy.Apply(main_program, member_->places_, loss_var_name, temp_owned_graph = build_strategy.Apply(std::move(temp_owned_graph), member_->places_, loss_var_name,
member_->local_scopes_, member_->nranks_, member_->local_scopes_, member_->nranks_,
member_->use_cuda_); member_->use_cuda_);
=======
std::vector<ir::Graph *> compiled_graphs;
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
if (parallel_graphs) {
for (size_t i = 0; i < member_->places_.size(); ++i) {
auto temp_owned_graph = build_strategy.Apply(
std::move(temp_owned_graphs[i]), {member_->places_[i]}, loss_var_name,
{member_->local_scopes_[i]}, member_->nranks_, member_->use_cuda_,
member_->nccl_ctxs_.get());
compiled_graphs.push_back(temp_owned_graph.release());
}
} else {
auto temp_owned_graph = build_strategy.Apply(
std::move(temp_owned_graphs[0]), member_->places_, loss_var_name,
member_->local_scopes_, member_->nranks_, member_->use_cuda_,
member_->nccl_ctxs_.get());
compiled_graphs.push_back(temp_owned_graph.release());
}
#else
auto temp_owned_graph = build_strategy.Apply(
std::move(temp_owned_graphs[0]), member_->places_, loss_var_name,
member_->local_scopes_, member_->nranks_, member_->use_cuda_);
compiled_graphs.push_back(temp_owned_graph.release());
>>>>>>> fix parallel graph mode program
#endif #endif
auto max_memory_size = GetEagerDeletionThreshold(); auto max_memory_size = GetEagerDeletionThreshold();
VLOG(10) << "Eager Deletion Threshold " VLOG(10) << "Eager Deletion Threshold "
<< static_cast<float>(max_memory_size) / (1 << 30); << static_cast<float>(max_memory_size) / (1 << 30);
if (max_memory_size >= 0) { if (max_memory_size >= 0) {
<<<<<<< HEAD
graph = member_->PrepareGCAndRefCnts(std::move(graph), graph = member_->PrepareGCAndRefCnts(std::move(graph),
static_cast<size_t>(max_memory_size)); static_cast<size_t>(max_memory_size)).release();
=======
for (size_t i = 0; i < graphs.size(); ++i) {
compiled_graphs[i] =
member_
->PrepareGCAndRefCnts(
std::unique_ptr<ir::Graph>(compiled_graphs[i]),
static_cast<size_t>(max_memory_size))
.release();
}
>>>>>>> fix parallel graph mode program
} }
// Step 3. Create vars in each scope. Passes may also create new vars. // Step 3. Create vars in each scope. Passes may also create new vars.
// skip control vars and empty vars // skip control vars and empty vars
std::vector<details::VariableInfo> var_infos; std::vector<details::VariableInfo> var_infos;
<<<<<<< HEAD
for (auto &node : graph->Nodes()) { for (auto &node : graph->Nodes()) {
if (node->IsVar() && !node->IsCtrlVar() && node->Var()) { if (node->IsVar() && !node->IsCtrlVar() && node->Var()) {
var_infos.emplace_back(); var_infos.emplace_back();
var_infos.back().name_ = node->Var()->Name(); var_infos.back().name_ = node->Var()->Name();
var_infos.back().type_ = node->Var()->GetType(); var_infos.back().type_ = node->Var()->GetType();
var_infos.back().persistable_ = node->Var()->Persistable(); var_infos.back().persistable_ = node->Var()->Persistable();
=======
for (auto &graph : compiled_graphs) {
for (auto &node : graph->Nodes()) {
if (node->IsVar() && !node->IsCtrlVar() && node->Var()) {
var_infos.emplace_back();
var_infos.back().name_ = node->Var()->Name();
var_infos.back().type_ = node->Var()->GetType();
var_infos.back().persistable_ = node->Var()->Persistable();
}
>>>>>>> fix parallel graph mode program
} }
} }
// If the loss_var_name is given, the number of graph should be only one. // If the loss_var_name is given, the number of graph should be only one.
if (loss_var_name.size()) { if (loss_var_name.size()) {
<<<<<<< HEAD
size_t graph_num = ir::GraphNum(*graph); size_t graph_num = ir::GraphNum(*graph);
=======
size_t graph_num = ir::GraphNum(*compiled_graphs[0]);
>>>>>>> fix parallel graph mode program
if (graph_num > 1) { if (graph_num > 1) {
LOG(WARNING) LOG(WARNING)
<< "The number of graph should be only one, " << "The number of graph should be only one, "
"but the current graph has " "but the current graph has "
<<<<<<< HEAD
<< ir::GraphNum(*graph) << ir::GraphNum(*graph)
=======
<< ir::GraphNum(*compiled_graphs[0])
>>>>>>> fix parallel graph mode program
<< " sub_graphs. If you want to see the nodes of the " << " sub_graphs. If you want to see the nodes of the "
"sub_graphs, you should use 'FLAGS_print_sub_graph_dir' " "sub_graphs, you should use 'FLAGS_print_sub_graph_dir' "
"to specify the output dir. NOTES: if you not do training, " "to specify the output dir. NOTES: if you not do training, "
...@@ -303,26 +379,42 @@ ParallelExecutor::ParallelExecutor( ...@@ -303,26 +379,42 @@ ParallelExecutor::ParallelExecutor(
} }
} }
<<<<<<< HEAD
if (build_strategy.enable_parallel_graph_) { if (build_strategy.enable_parallel_graph_) {
#ifdef PADDLE_WITH_CUDA #ifdef PADDLE_WITH_CUDA
// TODO(Yancey1989): Remove passing in the main_program when // TODO(Yancey1989): Remove passing in the main_program when
// allreduce_seq_pass doesn't need it as the attr. // allreduce_seq_pass doesn't need it as the attr.
=======
if (parallel_graphs) {
>>>>>>> polish
member_->executor_.reset(new details::ParallelSSAGraphExecutor( member_->executor_.reset(new details::ParallelSSAGraphExecutor(
<<<<<<< HEAD
exec_strategy, member_->local_scopes_, member_->places_, main_program, exec_strategy, member_->local_scopes_, member_->places_, main_program,
std::move(graph))); graph));
#else #else
PADDLE_THROW( PADDLE_THROW(
"Paddle should be compiled with CUDA for ParallelGraph Execution."); "Paddle should be compiled with CUDA for ParallelGraph Execution.");
#endif #endif
} else {
if (exec_strategy.type_ == ExecutionStrategy::kDefault) {
member_->executor_.reset(new details::ThreadedSSAGraphExecutor(
exec_strategy, member_->local_scopes_, member_->places_, graph));
} else {
member_->executor_.reset(new details::FastThreadedSSAGraphExecutor(
exec_strategy, member_->local_scopes_, member_->places_, graph));
=======
exec_strategy, member_->local_scopes_, member_->places_,
compiled_graphs));
} else { } else {
if (exec_strategy.type_ == ExecutionStrategy::kDefault) { if (exec_strategy.type_ == ExecutionStrategy::kDefault) {
member_->executor_.reset(new details::ThreadedSSAGraphExecutor( member_->executor_.reset(new details::ThreadedSSAGraphExecutor(
exec_strategy, member_->local_scopes_, member_->places_, exec_strategy, member_->local_scopes_, member_->places_,
std::move(graph))); compiled_graphs[0]));
} else { } else {
member_->executor_.reset(new details::FastThreadedSSAGraphExecutor( member_->executor_.reset(new details::FastThreadedSSAGraphExecutor(
exec_strategy, member_->local_scopes_, member_->places_, exec_strategy, member_->local_scopes_, member_->places_,
std::move(graph))); compiled_graphs[0]));
>>>>>>> fix parallel graph mode program
} }
} }
...@@ -452,24 +544,33 @@ void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes( ...@@ -452,24 +544,33 @@ void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes(
} }
} }
bool ParallelExecutor::EnableParallelGraphExecution( ParallelExecutor::~ParallelExecutor() {
const ProgramDesc &main_program, const ExecutionStrategy &exec_strategy, for (auto &p : member_->places_) {
const BuildStrategy &build_strategy) const { platform::DeviceContextPool::Instance().Get(p)->Wait();
}
delete member_;
}
bool EnableParallelGraphExecution(const ir::Graph &graph,
const ExecutionStrategy &exec_strategy,
const BuildStrategy &build_strategy) {
if (!FLAGS_enable_parallel_graph) return false; if (!FLAGS_enable_parallel_graph) return false;
bool enable_parallel_graph = true; bool enable_parallel_graph = true;
// TODO(Yancey1989): support sparse update in ParallelGraph mode.
for (auto &var_desc : main_program.Block(0).AllVars()) {
if (var_desc->GetType() == proto::VarType::SELECTED_ROWS) {
enable_parallel_graph = false;
}
}
// TODO(Yancey1989): support pserver mode for (ir::Node *node : graph.Nodes()) {
for (auto &op_desc : main_program.Block(0).AllOps()) { if (node->IsVar() && node->Var()) {
if (op_desc->Type() == "send" || op_desc->Type() == "recv") { // TODO(Yancey1989): support sparse update in ParallelGraph mode.
enable_parallel_graph = false; if (node->Var()->GetType() == proto::VarType::SELECTED_ROWS) {
break; enable_parallel_graph = false;
break;
}
} else if (node->IsOp() && node->Op()) {
// TODO(Yancey1989): support pserver mode
if (node->Op()->Type() == "send" || node->Op()->Type() == "recv") {
enable_parallel_graph = false;
break;
}
} }
} }
...@@ -481,13 +582,6 @@ bool ParallelExecutor::EnableParallelGraphExecution( ...@@ -481,13 +582,6 @@ bool ParallelExecutor::EnableParallelGraphExecution(
return enable_parallel_graph; return enable_parallel_graph;
} }
ParallelExecutor::~ParallelExecutor() {
for (auto &p : member_->places_) {
platform::DeviceContextPool::Instance().Get(p)->Wait();
}
delete member_;
}
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
......
...@@ -46,7 +46,7 @@ class ParallelExecutor { ...@@ -46,7 +46,7 @@ class ParallelExecutor {
public: public:
explicit ParallelExecutor(const std::vector<platform::Place> &places, explicit ParallelExecutor(const std::vector<platform::Place> &places,
const std::unordered_set<std::string> &bcast_vars, const std::unordered_set<std::string> &bcast_vars,
const ProgramDesc &main_program, const std::vector<ir::Graph *> &graphs,
const std::string &loss_var_name, Scope *scope, const std::string &loss_var_name, Scope *scope,
const std::vector<Scope *> &local_scopes, const std::vector<Scope *> &local_scopes,
const ExecutionStrategy &exec_strategy, const ExecutionStrategy &exec_strategy,
...@@ -71,9 +71,6 @@ class ParallelExecutor { ...@@ -71,9 +71,6 @@ class ParallelExecutor {
private: private:
void BCastParamsToDevices(const std::unordered_set<std::string> &vars) const; void BCastParamsToDevices(const std::unordered_set<std::string> &vars) const;
bool EnableParallelGraphExecution(const ProgramDesc &main_program,
const ExecutionStrategy &exec_strategy,
const BuildStrategy &build_strategy) const;
ParallelExecutorPrivate *member_; ParallelExecutorPrivate *member_;
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
...@@ -81,5 +78,9 @@ class ParallelExecutor { ...@@ -81,5 +78,9 @@ class ParallelExecutor {
#endif #endif
}; };
bool EnableParallelGraphExecution(const ir::Graph &graph,
const ExecutionStrategy &exec_strategy,
const BuildStrategy &build_strategy);
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
...@@ -101,7 +101,8 @@ void BindGraph(py::module *m) { ...@@ -101,7 +101,8 @@ void BindGraph(py::module *m) {
[](Graph &self, Node &node) { return self.RemoveNode(&node); }) [](Graph &self, Node &node) { return self.RemoveNode(&node); })
.def("retrieve_node", &Graph::RetrieveNode, .def("retrieve_node", &Graph::RetrieveNode,
return_value_policy::reference) return_value_policy::reference)
.def("resolve_hazard", &Graph::ResolveHazard); .def("resolve_hazard", &Graph::ResolveHazard)
.def("origin_program_desc", &Graph::OriginProgram);
} }
void BindNode(py::module *m) { void BindNode(py::module *m) {
......
...@@ -976,6 +976,9 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -976,6 +976,9 @@ All parameter, weight, gradient are variables in Paddle.
[](ir::PassBuilder &self, size_t idx) { self.RemovePass(idx); }); [](ir::PassBuilder &self, size_t idx) { self.RemovePass(idx); });
// -- python binds for parallel executor. // -- python binds for parallel executor.
m.def("_enable_parallel_graph_execution",
framework::EnableParallelGraphExecution);
py::class_<ParallelExecutor> pe(m, "ParallelExecutor"); py::class_<ParallelExecutor> pe(m, "ParallelExecutor");
py::class_<ExecutionStrategy> exec_strategy(pe, "ExecutionStrategy", R"DOC( py::class_<ExecutionStrategy> exec_strategy(pe, "ExecutionStrategy", R"DOC(
ExecutionStrategy allows the user to more preciously control how to run ExecutionStrategy allows the user to more preciously control how to run
...@@ -1213,9 +1216,10 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -1213,9 +1216,10 @@ All parameter, weight, gradient are variables in Paddle.
cannot be updated after being finalized.)DOC"); cannot be updated after being finalized.)DOC");
pe.def(py::init<const std::vector<platform::Place> &, pe.def(py::init<const std::vector<platform::Place> &,
const std::unordered_set<std::string> &, const ProgramDesc &, const std::unordered_set<std::string> &,
const std::string &, Scope *, std::vector<Scope *> &, const std::vector<ir::Graph *> &, const std::string &,
const ExecutionStrategy &, const BuildStrategy &>()) Scope *, std::vector<Scope *> &, const ExecutionStrategy &,
const BuildStrategy &>())
// NOTE: even we return a vec<Scope*>* to Python use reference policy. // NOTE: even we return a vec<Scope*>* to Python use reference policy.
// We still cannot get local_scope from this vector, since the element // We still cannot get local_scope from this vector, since the element
// of vec<Scope*> will be freed by Python GC. We can only return Scope* // of vec<Scope*> will be freed by Python GC. We can only return Scope*
......
...@@ -17,6 +17,7 @@ import os ...@@ -17,6 +17,7 @@ import os
import six import six
import sys import sys
from .. import compat as cpt from .. import compat as cpt
from . import framework
from . import core from . import core
...@@ -36,7 +37,7 @@ def _place_obj(place): ...@@ -36,7 +37,7 @@ def _place_obj(place):
class CompiledProgram(object): class CompiledProgram(object):
""" """
Compiles a Program for execution. Compiles to Graph for execution.
1. Users first create the program with layers. 1. Users first create the program with layers.
2. Optionally, users use CompiledProgram to optimize the program before run. 2. Optionally, users use CompiledProgram to optimize the program before run.
...@@ -51,7 +52,7 @@ class CompiledProgram(object): ...@@ -51,7 +52,7 @@ class CompiledProgram(object):
Example: Example:
.. code-block:: python .. code-block:: python
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() place = fluid.CUDAPlace(0) if use_gpu else fluid.CPUPlace()
exe = fluid.Executor(place) exe = fluid.Executor(place)
exe.run(startup) exe.run(startup)
compiled_prog = compiler.CompiledProgram(main).with_data_parallel( compiled_prog = compiler.CompiledProgram(main).with_data_parallel(
...@@ -62,11 +63,25 @@ class CompiledProgram(object): ...@@ -62,11 +63,25 @@ class CompiledProgram(object):
fetch_list=[loss.name]) fetch_list=[loss.name])
Args: Args:
program: Program instance that contains the model logic. program_or_graph (Graph|Program): If it's Program, it will be first
lowered to a graph for further optimizations. If it's a graph
(potentially optimized before), it will be directly used for
further optimizations. Note: graph is only supported when compiled
with with_data_parallel option.
""" """
def __init__(self, program): def __init__(self, program_or_graph):
self._program = program if isinstance(program_or_graph, core.Graph):
self._graph = program_or_graph
self._program = None
elif isinstance(program_or_graph, framework.Program):
self._graph = core.Graph(program_or_graph.desc)
self._program = program_or_graph
else:
raise ValueError("Wrong program_to_graph type: %s" %
type(program_or_graph))
self._program_desc = self._graph.origin_program_desc()
self._scope = None self._scope = None
self._place = None self._place = None
self._executor = None self._executor = None
...@@ -101,6 +116,7 @@ class CompiledProgram(object): ...@@ -101,6 +116,7 @@ class CompiledProgram(object):
self self
""" """
assert not self._is_data_parallel, "Already compiled with parallel." assert not self._is_data_parallel, "Already compiled with parallel."
assert not self._is_inference, "Cannot compile both data parallel and inference"
self._is_data_parallel = True self._is_data_parallel = True
self._build_strategy = build_strategy self._build_strategy = build_strategy
self._exec_strategy = exec_strategy self._exec_strategy = exec_strategy
...@@ -120,11 +136,13 @@ class CompiledProgram(object): ...@@ -120,11 +136,13 @@ class CompiledProgram(object):
Returns: Returns:
self self
""" """
assert not self._is_data_parallel, "Cannot compile both data parallel and inference."
assert not self._is_inference, "Already compiled with inference"
assert any([ assert any([
isinstance(config, InferNativeConfig), isinstance(config, InferNativeConfig),
isinstance(config, InferAnalysisConfig) isinstance(config, InferAnalysisConfig)
]) ])
self._is_data_parallel = False
self._is_inference = True self._is_inference = True
self._infer_config = config self._infer_config = config
return self return self
...@@ -173,37 +191,56 @@ class CompiledProgram(object): ...@@ -173,37 +191,56 @@ class CompiledProgram(object):
os.environ.get('CPU_NUM', multiprocessing.cpu_count())) os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
self._exec_strategy.num_threads = cpu_num * 2 self._exec_strategy.num_threads = cpu_num * 2
trainers_endpoints = self._program._trainers_endpoints
# FIXME(dzhwinter): enable_inplace should be after memory_optimize # FIXME(dzhwinter): enable_inplace should be after memory_optimize
# if turn on python memory optimize, turn off the inplace_pass. # if turn on python memory optimize, turn off the inplace_pass.
if self._build_strategy.memory_optimize is None: if self._build_strategy.memory_optimize is None:
self._build_strategy.memory_optimize = False if self._program._is_mem_optimized else True self._build_strategy.memory_optimize = False if self._program and self._program._is_mem_optimized else True
if self._build_strategy.enable_inplace is None: if self._build_strategy.enable_inplace is None:
self._build_strategy.enable_inplace = False if self._program._is_mem_optimized else True self._build_strategy.enable_inplace = False if self._program and self._program._is_mem_optimized else True
# TODO(wuyi): trainer endpoings should be passed in through
# build_strategy, not program.xxx.
if self._program and self._build_strategy.num_trainers > 1 and \
self._program._trainers_endpoints:
tps = self._program._trainers_endpoints
if self._build_strategy.num_trainers > 1 and trainers_endpoints:
assert self._build_strategy.num_trainers == len( assert self._build_strategy.num_trainers == len(
trainers_endpoints), "num_trainers == len(end_points)" tps), "num_trainers == len(end_points)"
self._build_strategy.trainers_endpoints = trainers_endpoints self._build_strategy.trainers_endpoints = tps
self._persistable_vars = set([ self._persistable_vars = []
cpt.to_text(v.name) for block_id in range(self._program_desc.num_blocks()):
for v in [ bdesc = self._program_desc.block(block_id)
var for var in self._program.list_vars() self._persistable_vars.extend([
if var.persistable and var.type != core.VarDesc.VarType.RAW cpt.to_text(v.name()) for v in bdesc.all_vars()
] if v.persistable() and v.type() != core.VarDesc.VarType.RAW
]) ])
places = list(map(_place_obj, self._places)) places = list(map(_place_obj, self._places))
# FIXME(Yancey1989): parallel graph mode get better performance
# in GPU allreduce distributed training. Need an elegant way to
# choice the execution strategy.
enable_parallel_graph = \
core._enable_parallel_graph_execution(self._graph,
self._exec_strategy,
self._build_strategy) and \
self._program # only supported if compile program not graph.
self._pe_graphs = [self._graph]
if enable_parallel_graph:
for _ in range(len(places) - 1):
self._pe_graphs.append(core.Graph(self._program_desc))
return core.ParallelExecutor( return core.ParallelExecutor(
places, self._persistable_vars, self._program.desc, places,
set(self._persistable_vars), self._pe_graphs,
cpt.to_text(self._loss_name) cpt.to_text(self._loss_name)
if self._loss_name else six.u(''), self._scope, self._local_scopes, if self._loss_name else six.u(''), self._scope, self._local_scopes,
self._exec_strategy, self._build_strategy) self._exec_strategy, self._build_strategy)
def _compile_inference(self): def _compile_inference(self):
assert self._is_data_parallel is False
return core.create_paddle_predictor(self._infer_config) return core.create_paddle_predictor(self._infer_config)
def _compile(self, scope, place): def _compile(self, scope, place):
......
# copyright (c) 2018 paddlepaddle authors. all rights reserved.
#
# licensed under the apache license, version 2.0 (the "license");
# you may not use this file except in compliance with the license.
# you may obtain a copy of the license at
#
# http://www.apache.org/licenses/license-2.0
#
# unless required by applicable law or agreed to in writing, software
# distributed under the license is distributed on an "as is" basis,
# without warranties or conditions of any kind, either express or implied.
# see the license for the specific language governing permissions and
# limitations under the license.
import unittest
import random
import numpy as np
import paddle.fluid as fluid
import six
from paddle.fluid.framework import Program
from paddle.fluid.framework import IrGraph
from paddle.fluid.contrib.slim.quantization import QuantizationTransformPass
from paddle.fluid import core
def linear_fc(num):
data = fluid.layers.data(name='image', shape=[1, 28, 28], dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
hidden = data
for _ in six.moves.xrange(num):
hidden = fluid.layers.fc(hidden, size=128, act='relu')
fc = fluid.layers.fc(input=hidden, size=10)
loss = fluid.layers.softmax_with_cross_entropy(fc, label=label)
loss = fluid.layers.mean(loss)
return loss
def residual_block(num):
def conv_bn_layer(input,
ch_out,
filter_size,
stride,
padding,
act='relu',
bias_attr=False):
tmp = fluid.layers.conv2d(
input=input,
filter_size=filter_size,
num_filters=ch_out,
stride=stride,
padding=padding,
act=None,
bias_attr=bias_attr)
return fluid.layers.batch_norm(input=tmp, act=act)
data = fluid.layers.data(name='image', shape=[1, 28, 28], dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
hidden = data
for _ in six.moves.xrange(num):
conv = conv_bn_layer(hidden, 16, 3, 1, 1, act=None, bias_attr=True)
short = conv_bn_layer(hidden, 16, 1, 1, 0, act=None)
hidden = fluid.layers.elementwise_add(x=conv, y=short, act='relu')
fc = fluid.layers.fc(input=hidden, size=10)
loss = fluid.layers.softmax_with_cross_entropy(fc, label)
loss = fluid.layers.mean(loss)
return loss
class TestQuantizationTransformPass(unittest.TestCase):
def setUp(self):
self.quantizable_op_and_inputs = {
'conv2d': ['Input', 'Filter'],
'depthwise_conv2d': ['Input', 'Filter'],
'mul': ['X', 'Y']
}
self.quantizable_grad_op_inputs = {
'conv2d_grad': ['Input', 'Filter'],
'depthwise_conv2d_grad': ['Input', 'Filter'],
'mul_grad': ['X', 'Y']
}
def check_program(self, transform_pass, program):
quantized_ops = set()
for block in program.blocks:
for op in block.ops:
# check forward
if op.type in self.quantizable_op_and_inputs:
for arg_name in op.input_arg_names:
self.assertTrue(
arg_name.endswith('.quantized.dequantized'))
quantized_ops.add(arg_name)
for op in block.ops:
# check backward
if op.type in self.quantizable_grad_op_inputs:
for pname in self.quantizable_grad_op_inputs[op.type]:
arg_name = op.input(pname)[0]
self.assertTrue(
arg_name.endswith('.quantized.dequantized'))
self.assertTrue(arg_name in quantized_ops)
def linear_fc_quant(self, quant_type):
main = fluid.Program()
startup = fluid.Program()
with fluid.program_guard(main, startup):
loss = linear_fc(3)
opt = fluid.optimizer.Adam(learning_rate=0.001)
opt.minimize(loss)
exe = fluid.Executor(fluid.CPUPlace())
graph = IrGraph(core.Graph(main.desc), for_test=False)
transform_pass = QuantizationTransformPass(
scope=fluid.global_scope(),
program_exe=exe,
activation_quantize_type=quant_type)
transform_pass.apply(graph)
marked_nodes = set()
for op in graph.all_ops():
if op.name().find('quantize') > -1:
marked_nodes.add(op)
graph.draw('.', 'quantize_fc_' + quant_type, marked_nodes)
program = graph.to_program()
self.check_program(transform_pass, program)
val_graph = IrGraph(core.Graph(program.desc), for_test=False)
val_marked_nodes = set()
for op in val_graph.all_ops():
if op.name().find('quantize') > -1:
val_marked_nodes.add(op)
val_graph.draw('.', 'val_fc_' + quant_type, val_marked_nodes)
def test_linear_fc_quant_abs_max(self):
self.act_quant_op_type = 'fake_quantize_abs_max'
self.linear_fc_quant('abs_max')
def test_linear_fc_quant_range_abs_max(self):
self.act_quant_op_type = 'fake_quantize_range_abs_max'
self.linear_fc_quant('range_abs_max')
def residual_block_quant(self, quant_type):
main = fluid.Program()
startup = fluid.Program()
with fluid.program_guard(main, startup):
loss = residual_block(2)
opt = fluid.optimizer.Adam(learning_rate=0.001)
opt.minimize(loss)
exe = fluid.Executor(fluid.CPUPlace())
graph = IrGraph(core.Graph(main.desc), for_test=False)
transform_pass = QuantizationTransformPass(
scope=fluid.global_scope(),
program_exe=exe,
activation_quantize_type=quant_type)
transform_pass.apply(graph)
marked_nodes = set()
for op in graph.all_ops():
if op.name().find('quantize') > -1:
marked_nodes.add(op)
graph.draw('.', 'quantize_residual_' + quant_type, marked_nodes)
program = graph.to_program()
self.check_program(transform_pass, program)
val_graph = IrGraph(core.Graph(program.desc), for_test=False)
val_marked_nodes = set()
for op in val_graph.all_ops():
if op.name().find('quantize') > -1:
val_marked_nodes.add(op)
val_graph.draw('.', 'val_residual_' + quant_type, val_marked_nodes)
def test_residual_block_abs_max(self):
self.act_quant_op_type = 'fake_quantize_abs_max'
self.residual_block_quant('abs_max')
def test_residual_block_range_abs_max(self):
self.act_quant_op_type = 'fake_quantize_range_abs_max'
self.residual_block_quant('range_abs_max')
def test_execute_graph(self):
main = fluid.Program()
startup = fluid.Program()
with fluid.program_guard(main, startup):
loss = linear_fc(3)
opt = fluid.optimizer.Adam(learning_rate=0.0001)
opt.minimize(loss)
exe = fluid.Executor(fluid.CPUPlace())
graph = IrGraph(core.Graph(main.desc), for_test=False)
exe.run(startup)
binary = fluid.CompiledProgram(graph.graph).with_data_parallel(
loss_name=loss.name)
for i in range(10):
loss_val = exe.run(binary,
feed={
'image': np.ones(
[32, 784], dtype=np.float32),
'label': np.ones(
[32, 1], dtype=np.int64)
},
fetch_list=[loss])
if i == 0:
start_loss = np.sum(loss_val)
elif i == 9:
end_loss = np.sum(loss_val)
self.assertLess(end_loss, start_loss)
if __name__ == '__main__':
unittest.main()
...@@ -538,6 +538,7 @@ class Executor(object): ...@@ -538,6 +538,7 @@ class Executor(object):
else: else:
# TODO(panyx0718): Can compile program to optimize executor # TODO(panyx0718): Can compile program to optimize executor
# performance. # performance.
assert program._program, "CompiledProgram is compiled from graph, can only run with_data_parallel."
return self._run( return self._run(
program._program, program._program,
self._default_executor, self._default_executor,
......
...@@ -2322,7 +2322,7 @@ class Program(object): ...@@ -2322,7 +2322,7 @@ class Program(object):
@staticmethod @staticmethod
def _construct_from_desc(desc): def _construct_from_desc(desc):
""" """
Construct a program from program desc. Construct a program from program desc. (Experiment)
Args: Args:
desc(core.ProgramDesc): The program desc for constructing. desc(core.ProgramDesc): The program desc for constructing.
...@@ -2332,6 +2332,7 @@ class Program(object): ...@@ -2332,6 +2332,7 @@ class Program(object):
""" """
p = Program() p = Program()
p.desc = desc p.desc = desc
# TODO(wangzhen): Block.vars/ops are not filled, should fix it.
p.blocks = [Block(p, i) for i in six.moves.range(p.desc.num_blocks())] p.blocks = [Block(p, i) for i in six.moves.range(p.desc.num_blocks())]
p._sync_with_cpp() p._sync_with_cpp()
return p return p
......
...@@ -185,8 +185,11 @@ class ParallelExecutor(object): ...@@ -185,8 +185,11 @@ class ParallelExecutor(object):
places = list(map(place_obj, self._places)) places = list(map(place_obj, self._places))
# step7: init ParallelExecutor # step7: init ParallelExecutor
# ParallelExecutor API will be deprecated, don't support parallel graph.
self._graphs = [core.Graph(main.desc)]
self.executor = core.ParallelExecutor( self.executor = core.ParallelExecutor(
places, persistable_vars, main.desc, places, persistable_vars, self._graphs,
cpt.to_text(loss_name) if loss_name else six.u(''), scope, cpt.to_text(loss_name) if loss_name else six.u(''), scope,
local_scopes, exec_strategy, build_strategy) local_scopes, exec_strategy, build_strategy)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册