提交 69b1ebdf 编写于 作者: S sneaxiy

merge develop

test=develop
......@@ -46,7 +46,7 @@ paddle.fluid.AsyncExecutor.init_worker ArgSpec(args=['self', 'dist_desc', 'start
paddle.fluid.AsyncExecutor.run ArgSpec(args=['self', 'program', 'data_feed', 'filelist', 'thread_num', 'fetch', 'mode', 'debug'], varargs=None, keywords=None, defaults=('', False))
paddle.fluid.AsyncExecutor.save_model ArgSpec(args=['self', 'save_path'], varargs=None, keywords=None, defaults=None)
paddle.fluid.AsyncExecutor.stop ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None)
paddle.fluid.CompiledProgram.__init__ ArgSpec(args=['self', 'program'], varargs=None, keywords=None, defaults=None)
paddle.fluid.CompiledProgram.__init__ ArgSpec(args=['self', 'program_or_graph'], varargs=None, keywords=None, defaults=None)
paddle.fluid.CompiledProgram.with_data_parallel ArgSpec(args=['self', 'loss_name', 'build_strategy', 'exec_strategy', 'share_vars_from', 'places'], varargs=None, keywords=None, defaults=(None, None, None, None, None))
paddle.fluid.CompiledProgram.with_inference_optimize ArgSpec(args=['self', 'config'], varargs=None, keywords=None, defaults=None)
paddle.fluid.ExecutionStrategy.__init__ __init__(self: paddle.fluid.core.ParallelExecutor.ExecutionStrategy) -> None
......
......@@ -163,6 +163,20 @@ std::vector<OpDesc *> BlockDesc::AllOps() const {
return res;
}
void BlockDesc::Clear() {
// clear all ops
ops_.clear();
// clear all vars which are not persistable
for (auto it = vars_.begin(); it != vars_.end();) {
if (it->second->Persistable()) {
++it;
} else {
vars_.erase(it++);
}
}
}
void BlockDesc::Flush() {
for (auto &op_desc : ops_) {
op_desc->Flush();
......
......@@ -97,6 +97,8 @@ class BlockDesc {
std::vector<OpDesc *> AllOps() const;
void Clear();
size_t OpSize() const { return ops_.size(); }
OpDesc *Op(int idx) const { return ops_.at(idx).get(); }
......
......@@ -50,7 +50,7 @@ std::unique_ptr<ir::Graph> AllReduceDepsPass::ApplyImpl(
std::unordered_map<std::string, int> vars;
// TODO(gongwb): use graph topology sort to find the order of operators.
// Note that must assert topology sort is stable
auto& ops = Get<const std::vector<OpDesc*>>(kAllOpDescs);
auto& ops = graph->Get<const std::vector<OpDesc*>>(kStaleProgramOpDescs);
for (auto* op_desc : ops) {
auto outputs = op_desc->Outputs();
for (auto& o_it : outputs) {
......@@ -120,4 +120,4 @@ std::unique_ptr<ir::Graph> AllReduceDepsPass::ApplyImpl(
REGISTER_PASS(all_reduce_deps_pass,
paddle::framework::details::AllReduceDepsPass)
.RequirePassAttr(paddle::framework::details::kAllOpDescs);
.RequireGraphAttr(paddle::framework::details::kStaleProgramOpDescs);
......@@ -174,7 +174,8 @@ bool BuildStrategy::IsMultiDevPass(const std::string &pass_name) const {
}
std::unique_ptr<ir::Graph> BuildStrategy::Apply(
const ProgramDesc &main_program, const std::vector<platform::Place> &places,
std::unique_ptr<ir::Graph> graph,
const std::vector<platform::Place> &places,
const std::string &loss_var_name, const std::vector<Scope *> &local_scopes,
const size_t &nranks,
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
......@@ -185,7 +186,6 @@ std::unique_ptr<ir::Graph> BuildStrategy::Apply(
// Create a default one if not finalized by user.
CreatePassesFromStrategy(false);
std::unique_ptr<ir::Graph> graph(new ir::Graph(main_program));
for (std::shared_ptr<ir::Pass> &pass : pass_builder_->AllPasses()) {
if (IsMultiDevPass(pass->Type())) {
pass->Erase(kPlaces);
......@@ -203,41 +203,12 @@ std::unique_ptr<ir::Graph> BuildStrategy::Apply(
pass->Erase("nccl_ctxs");
pass->SetNotOwned<platform::NCCLContextMap>("nccl_ctxs", nctx);
#endif
} else if (pass->Type() == "memory_optimize_pass") {
if (graph->Has(kAllOpDescs)) {
graph->Erase(kAllOpDescs);
}
const std::vector<OpDesc *> *all_op_descs =
new std::vector<OpDesc *>(main_program.Block(0).AllOps());
graph->Set<const std::vector<OpDesc *>>(kAllOpDescs,
all_op_descs); // take ownership
pass->Erase(kAllOpDescs);
pass->SetNotOwned<const std::vector<OpDesc *>>(kAllOpDescs, all_op_descs);
} else if (pass->Type() == "sequential_execution_pass") {
LOG(INFO) << "set enable_sequential_execution:"
<< enable_sequential_execution_;
pass->Erase(kAllOpDescs);
pass->Set<const std::vector<OpDesc *>>(
kAllOpDescs,
new std::vector<OpDesc *>(main_program.Block(0).AllOps()));
} else if (pass->Type() == "all_reduce_deps_pass") {
LOG(INFO) << "SeqOnlyAllReduceOps:" << SeqOnlyAllReduceOps(*this)
<< ", num_trainers:" << num_trainers_;
pass->Erase(kAllOpDescs);
pass->Set<const std::vector<OpDesc *>>(
kAllOpDescs,
new std::vector<OpDesc *>(main_program.Block(0).AllOps()));
} else if (pass->Type() == "inplace_pass") {
if (graph->Has(kAllOpDescs)) {
graph->Erase(kAllOpDescs);
}
graph->Set<const std::vector<OpDesc *>>(
kAllOpDescs,
new std::vector<OpDesc *>(main_program.Block(0).AllOps()));
} else if (pass->Type() == "fuse_relu_depthwise_conv_pass") {
if (!use_cuda) {
LOG(WARNING) << "fuse_relu_depthwise_conv_pass is only supported on "
......
......@@ -114,7 +114,7 @@ struct BuildStrategy {
// Apply the passes built by the pass_builder_. The passes will be
// applied to the Program and output an ir::Graph.
std::unique_ptr<ir::Graph> Apply(const ProgramDesc &main_program,
std::unique_ptr<ir::Graph> Apply(std::unique_ptr<ir::Graph> graph,
const std::vector<platform::Place> &places,
const std::string &loss_var_name,
const std::vector<Scope *> &local_scopes,
......
......@@ -24,12 +24,11 @@ namespace details {
FastThreadedSSAGraphExecutor::FastThreadedSSAGraphExecutor(
const ExecutionStrategy &strategy, const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
std::unique_ptr<ir::Graph> &&graph)
const std::vector<platform::Place> &places, ir::Graph *graph)
: strategy_(strategy),
local_scopes_(local_scopes),
places_(places),
graph_(std::move(graph)),
graph_(graph),
pool_(strategy.num_threads_),
prepare_pool_(1), // add one more thread for generate op_deps
fetch_ctxs_(places) {
......@@ -110,14 +109,14 @@ FeedFetchList FastThreadedSSAGraphExecutor::Run(
}
}
if (exception_.IsCaught()) {
ClearFetchOp(graph_.get(), &fetch_ops);
ClearFetchOp(graph_, &fetch_ops);
exception_.ReThrow();
}
}
num_complete += num_comp;
}
// Wait FetchOps.
ClearFetchOp(graph_.get(), &fetch_ops);
ClearFetchOp(graph_, &fetch_ops);
return fetches;
}
......
......@@ -32,7 +32,7 @@ class FastThreadedSSAGraphExecutor : public SSAGraphExecutor {
FastThreadedSSAGraphExecutor(const ExecutionStrategy &strategy,
const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
std::unique_ptr<ir::Graph> &&graph);
ir::Graph *graph);
FeedFetchList Run(const std::vector<std::string> &fetch_tensors) override;
const ir::Graph &Graph() const override;
......@@ -40,7 +40,7 @@ class FastThreadedSSAGraphExecutor : public SSAGraphExecutor {
ExecutionStrategy strategy_;
std::vector<Scope *> local_scopes_;
std::vector<platform::Place> places_;
std::unique_ptr<ir::Graph> graph_;
ir::Graph *graph_;
std::unordered_map<OpHandleBase *, int> op_deps_;
std::vector<OpHandleBase *> bootstrap_ops_;
......
......@@ -33,10 +33,10 @@ namespace details {
using paddle::framework::VarDesc;
std::vector<ir::Node*> SortOpLikeDescOrder(const ir::Graph& graph) {
PADDLE_ENFORCE(graph.Has(kAllOpDescs),
"Graph has no attribute of kAllOpDescs.");
PADDLE_ENFORCE(graph.Has(kStaleProgramOpDescs),
"Graph has no attribute of kStaleProgramOpDescs.");
// 1. get op desc order
auto& op_descs = graph.Get<const std::vector<OpDesc*>>(kAllOpDescs);
auto& op_descs = graph.Get<const std::vector<OpDesc*>>(kStaleProgramOpDescs);
// 2. topology sort order
auto nodes = graph.Nodes();
......@@ -461,11 +461,21 @@ void ControlFlowGraph::LiveVariableAnalysis() {
}
}
}
for (auto* op : ops_) {
unlived_vars_[op] = std::set<std::string>();
for (auto& var : this->LiveIn(op)) {
if (!this->LiveOut(op).count(var)) {
unlived_vars_[op].insert(var);
}
}
}
}
void ControlFlowGraph::RenameVarInCFGGraph(const std::string& old_node,
const std::string& new_node,
int begin_idx) {
std::vector<bool> need_update(ops_.size(), false);
// update graph from begin idx to the end
for (size_t i = begin_idx; i != ops_.size(); ++i) {
auto* op = ops_[i];
......@@ -480,15 +490,27 @@ void ControlFlowGraph::RenameVarInCFGGraph(const std::string& old_node,
if (live_in_[op].find(old_node) != live_in_[op].end()) {
live_in_[op].erase(old_node);
live_in_[op].insert(new_node);
need_update[i] = true;
}
if (live_out_[op].find(old_node) != live_out_[op].end()) {
live_out_[op].erase(old_node);
live_out_[op].insert(new_node);
need_update[i] = true;
}
}
for (size_t i = begin_idx; i < ops_.size(); ++i) {
if (!need_update[i]) continue;
auto* op = ops_[i];
for (auto& var : this->LiveIn(op)) {
if (!this->LiveOut(op).count(var)) {
unlived_vars_[op].insert(var);
}
}
}
}
const std::set<std::string> ControlFlowGraph::LiveIn(ir::Node* op) const {
const std::set<std::string>& ControlFlowGraph::LiveIn(ir::Node* op) const {
auto it = live_in_.find(op);
PADDLE_ENFORCE(
it != live_in_.end(),
......@@ -496,7 +518,7 @@ const std::set<std::string> ControlFlowGraph::LiveIn(ir::Node* op) const {
return it->second;
}
const std::set<std::string> ControlFlowGraph::LiveOut(ir::Node* op) const {
const std::set<std::string>& ControlFlowGraph::LiveOut(ir::Node* op) const {
auto it = live_out_.find(op);
PADDLE_ENFORCE(
it != live_out_.end(),
......@@ -504,15 +526,24 @@ const std::set<std::string> ControlFlowGraph::LiveOut(ir::Node* op) const {
return it->second;
}
const std::set<std::string> ControlFlowGraph::Use(ir::Node* op) const {
const std::set<std::string>& ControlFlowGraph::Use(ir::Node* op) const {
auto it = uses_.find(op);
PADDLE_ENFORCE(
it != uses_.end(),
string::Sprintf("Expect %s in live_out, but Not Found.", op->Name()));
string::Sprintf("Expect %s in use, but Not Found.", op->Name()));
return it->second;
}
const std::set<std::string>& ControlFlowGraph::Unlived(ir::Node* op) const {
auto it = unlived_vars_.find(op);
PADDLE_ENFORCE(
it != unlived_vars_.end(),
string::Sprintf("Expect %s in unlived_set, but Not Found.", op->Name()));
return it->second;
return it->second;
}
const std::vector<ir::Node*> ControlFlowGraph::Ops() const { return ops_; }
const std::vector<ir::Node*>& ControlFlowGraph::Ops() const { return ops_; }
std::vector<ir::Node*>& ControlFlowGraph::Ops() { return ops_; }
......
......@@ -92,10 +92,11 @@ class ControlFlowGraph {
void RenameVarInCFGGraph(const std::string& old_node,
const std::string& new_node, int begin_idx);
const std::set<std::string> LiveIn(ir::Node* op) const;
const std::set<std::string> LiveOut(ir::Node* op) const;
const std::set<std::string> Use(ir::Node* op) const;
const std::vector<ir::Node*> Ops() const;
const std::set<std::string>& LiveIn(ir::Node* op) const;
const std::set<std::string>& LiveOut(ir::Node* op) const;
const std::set<std::string>& Use(ir::Node* op) const;
const std::set<std::string>& Unlived(ir::Node* op) const;
const std::vector<ir::Node*>& Ops() const;
std::vector<ir::Node*>& Ops();
// for ssa-graph nodes
......@@ -117,6 +118,7 @@ class ControlFlowGraph {
VarSetMap live_out_;
VarSetMap uses_; // op inputs
VarSetMap defs_; // op outputs
std::unordered_map<ir::Node*, std::set<std::string>> unlived_vars_;
std::vector<ir::Node*> ops_; // op sequence by topology sort
};
......
......@@ -228,9 +228,6 @@ TEST(CFGGraph, IRGraph) {
// prepare ir graph
auto prog = FillProgramDesc();
ir::Graph graph(prog);
const std::vector<OpDesc*>* all_op_descs =
new std::vector<OpDesc*>(prog.Block(0).AllOps());
graph.Set(details::kAllOpDescs, all_op_descs); // take ownership
ControlFlowGraph cfg(graph);
cfg.LiveVariableAnalysis();
......@@ -256,9 +253,6 @@ TEST(CFGGraph, IRGraph) {
TEST(SortOpLikeDescOrder, NormalTest) {
auto prog = FillProgramDesc();
ir::Graph graph(prog);
const std::vector<OpDesc*>* all_op_descs =
new std::vector<OpDesc*>(prog.Block(0).AllOps());
graph.Set(details::kAllOpDescs, all_op_descs); // take ownership
auto nodes = SortOpLikeDescOrder(graph);
auto op_descs = prog.Block(0).AllOps();
......@@ -273,9 +267,6 @@ TEST(SortOpLikeDescOrder, NormalTest) {
TEST(SortOpLikeDescOrder, RemoveOpDesc) {
auto prog = FillProgramDesc();
ir::Graph graph(prog);
const std::vector<OpDesc*>* all_op_descs =
new std::vector<OpDesc*>(prog.Block(0).AllOps());
graph.Set(details::kAllOpDescs, all_op_descs); // take ownership
auto nodes = graph.Nodes();
auto op_descs = prog.Block(0).AllOps();
ir::Node* found_node = nullptr;
......@@ -324,8 +315,6 @@ TEST(SortOpLikeDescOrder, RemoveOpDesc) {
// 3. add some op_desc
TEST(SortOpLikeDescOrder, AddOpDesc) {
auto prog = FillProgramDesc();
const std::vector<OpDesc*>* all_op_descs =
new std::vector<OpDesc*>(prog.Block(0).AllOps());
ir::Graph graph(prog);
auto find_node_in_graph = [&](std::string s) {
......@@ -342,9 +331,7 @@ TEST(SortOpLikeDescOrder, AddOpDesc) {
// cached desc different with real one
// mimic the intermidiete pass modify the programdesc.
graph.Set(details::kAllOpDescs, all_op_descs); // take ownership
auto op_descs = prog.Block(0).AllOps();
std::vector<OpDesc*> op_descs = graph.OriginProgram().Block(0).AllOps();
auto op = prog.MutableBlock(0)->AppendOp();
prog.MutableBlock(0)->Var("d1")->SetType(proto::VarType::LOD_TENSOR);
......@@ -376,9 +363,6 @@ TEST(SortOpLikeDescOrder, AddOpDesc) {
TEST(SortOpLikeDescOrder, AddAndDeleteOpDesc) {
auto prog = FillProgramDesc();
ir::Graph graph(prog);
const std::vector<OpDesc*>* all_op_descs =
new std::vector<OpDesc*>(prog.Block(0).AllOps());
graph.Set(details::kAllOpDescs, all_op_descs); // take ownership
auto find_node_in_graph = [&](std::string s) {
ir::Node* ret = nullptr;
......@@ -392,8 +376,9 @@ TEST(SortOpLikeDescOrder, AddAndDeleteOpDesc) {
return ret;
};
std::vector<OpDesc*> op_descs = graph.OriginProgram().Block(0).AllOps();
// remove sum node
auto op_descs = prog.Block(0).AllOps();
ir::Node* found_node = nullptr;
auto nodes = graph.Nodes();
for (auto node : nodes) {
......@@ -454,9 +439,7 @@ TEST(SortOpLikeDescOrder, AddAndDeleteOpDesc) {
TEST(SortOpLikeDescOrder, AddAndReplaceOpDescInplace) {
auto prog = FillProgramDesc();
ir::Graph graph(prog);
const std::vector<OpDesc*>* all_op_descs =
new std::vector<OpDesc*>(prog.Block(0).AllOps());
graph.Set(details::kAllOpDescs, all_op_descs); // take ownership
std::vector<OpDesc*> op_descs = graph.OriginProgram().Block(0).AllOps();
auto find_node_in_graph = [&](std::string s) {
ir::Node* ret = nullptr;
......@@ -470,7 +453,6 @@ TEST(SortOpLikeDescOrder, AddAndReplaceOpDescInplace) {
return ret;
};
auto op_descs = prog.Block(0).AllOps();
// add node
auto op = prog.MutableBlock(0)->AppendOp();
prog.MutableBlock(0)->Var("d1")->SetType(proto::VarType::LOD_TENSOR);
......
......@@ -118,8 +118,7 @@ std::unique_ptr<ir::Graph> MemoryOptimizePass::ApplyImpl(
}
}
// fill the pool
for (auto var : cfg_->LiveIn(op)) {
if (cfg_->LiveOut(op).count(var) == 0) {
for (auto& var : cfg_->Unlived(op)) {
ir::Node* var_node = cfg_->GetNodeByName(var, op);
if (var_node == nullptr || var_node->IsCtrlVar()) continue;
if (NodeCanReused(var_node) && !pool_.Has(var_node)) {
......@@ -127,7 +126,6 @@ std::unique_ptr<ir::Graph> MemoryOptimizePass::ApplyImpl(
}
}
}
}
graph->ResolveHazard(var_nodes_);
return graph;
......@@ -337,4 +335,4 @@ void MemoryOptimizePass::RenameVarInGraphNode(const std::string& var,
REGISTER_PASS(memory_optimize_pass,
paddle::framework::details::MemoryOptimizePass)
.RequireGraphAttr(paddle::framework::details::kAllOpDescs);
.RequireGraphAttr(paddle::framework::details::kStaleProgramOpDescs);
......@@ -20,8 +20,7 @@ namespace framework {
namespace details {
std::vector<std::unique_ptr<ir::Graph>>
ParallelSSAGraphExecutor::SeparateMultiDevicesGraph(
std::unique_ptr<ir::Graph> &&graph) {
ParallelSSAGraphExecutor::SeparateMultiDevicesGraph(ir::Graph *graph) {
std::vector<std::unique_ptr<ir::Graph>> graphs;
graphs.reserve(places_.size());
for (size_t i = 0; i < places_.size(); ++i) {
......@@ -77,24 +76,18 @@ ParallelSSAGraphExecutor::SeparateMultiDevicesGraph(
ParallelSSAGraphExecutor::ParallelSSAGraphExecutor(
const ExecutionStrategy &strategy, const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
const framework::ProgramDesc &main_prog, std::unique_ptr<ir::Graph> &&graph)
const std::vector<platform::Place> &places, ir::Graph *graph)
: strategy_(std::move(strategy)),
local_scopes_(std::move(local_scopes)),
pool_(places.size() >= 2 ? new ::ThreadPool(places.size()) : nullptr),
places_(std::move(places)),
main_prog_(main_prog),
// TODO(Yancey1989): Copying graphs is not safely since it deleted the
// attrs.
graphs_(SeparateMultiDevicesGraph(std::move(graph))) {
graphs_(SeparateMultiDevicesGraph(graph)) {
PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size());
auto seq_allreduce_pass =
ir::PassRegistry::Instance().Get("all_reduce_deps_pass");
seq_allreduce_pass->Erase(details::kAllOpDescs);
seq_allreduce_pass->Set<const std::vector<OpDesc *>>(
details::kAllOpDescs,
new std::vector<OpDesc *>(main_prog_.Block(0).AllOps()));
for (size_t i = 0; i < graphs_.size(); ++i) {
graphs_[i] = seq_allreduce_pass->Apply(std::move(graphs_[i]));
}
......@@ -107,7 +100,7 @@ ParallelSSAGraphExecutor::ParallelSSAGraphExecutor(
<< " to run the operators of the graph on each device.";
for (size_t i = 0; i < places.size(); ++i) {
executors_.emplace_back(new details::ThreadedSSAGraphExecutor(
strategy_, local_scopes_, {places_[i]}, std::move(graphs_.at(i))));
strategy_, local_scopes_, {places_[i]}, graphs_.at(i).get()));
}
}
......
......@@ -31,8 +31,7 @@ class ParallelSSAGraphExecutor : public SSAGraphExecutor {
ParallelSSAGraphExecutor(const ExecutionStrategy &strategy,
const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
const framework::ProgramDesc &main_prog,
std::unique_ptr<ir::Graph> &&graph);
ir::Graph *graph);
~ParallelSSAGraphExecutor() final = default;
const ir::Graph &Graph() const override { return *graphs_[0]; }
......@@ -41,13 +40,12 @@ class ParallelSSAGraphExecutor : public SSAGraphExecutor {
private:
std::vector<std::unique_ptr<ir::Graph>> SeparateMultiDevicesGraph(
std::unique_ptr<ir::Graph> &&graph);
ir::Graph *graph);
ExecutionStrategy strategy_;
std::vector<Scope *> local_scopes_;
std::unique_ptr<::ThreadPool> pool_{nullptr};
std::vector<platform::Place> places_;
framework::ProgramDesc main_prog_;
std::vector<std::unique_ptr<ir::Graph>> graphs_;
std::vector<std::unique_ptr<details::ThreadedSSAGraphExecutor>> executors_;
......
......@@ -40,7 +40,7 @@ std::unique_ptr<ir::Graph> SequentialExecutionPass::ApplyImpl(
static std::unordered_set<std::string> skip_dist_ops{
"send", "recv", "send_barrier", "fetch_barrier"};
auto &ops = Get<const std::vector<OpDesc *>>(kAllOpDescs);
auto &ops = graph->Get<const std::vector<OpDesc *>>(kStaleProgramOpDescs);
std::vector<ir::Node *> op_node_list;
op_node_list.reserve(ops.size());
......@@ -107,4 +107,4 @@ std::unique_ptr<ir::Graph> SequentialExecutionPass::ApplyImpl(
REGISTER_PASS(sequential_execution_pass,
paddle::framework::details::SequentialExecutionPass)
.RequirePassAttr(paddle::framework::details::kAllOpDescs);
.RequireGraphAttr(paddle::framework::details::kStaleProgramOpDescs);
......@@ -23,9 +23,8 @@ namespace framework {
namespace details {
ThreadedSSAGraphExecutor::ThreadedSSAGraphExecutor(
const ExecutionStrategy &strategy, const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
std::unique_ptr<ir::Graph> &&graph)
: graph_(std::move(graph)),
const std::vector<platform::Place> &places, ir::Graph *graph)
: graph_(graph),
pool_(strategy.num_threads_ >= 2 ? new ::ThreadPool(strategy.num_threads_)
: nullptr),
local_scopes_(local_scopes),
......@@ -110,7 +109,7 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
for (auto &run_op_future : run_op_futures_) {
run_op_future.wait();
}
ClearFetchOp(graph_.get(), &fetch_ops);
ClearFetchOp(graph_, &fetch_ops);
exception_holder_.ReThrow();
} else {
continue;
......@@ -135,7 +134,7 @@ FeedFetchList ThreadedSSAGraphExecutor::Run(
}
PADDLE_ENFORCE(ready_ops.empty());
// Wait FetchOps.
ClearFetchOp(graph_.get(), &fetch_ops);
ClearFetchOp(graph_, &fetch_ops);
return fetch_data;
}
......
......@@ -41,7 +41,7 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor {
ThreadedSSAGraphExecutor(const ExecutionStrategy &strategy,
const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
std::unique_ptr<ir::Graph> &&graph);
ir::Graph *graph);
const ir::Graph &Graph() const override { return *graph_; }
// Run a SSAGraph by a thread pool
......@@ -55,7 +55,7 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor {
details::OpHandleBase *op);
private:
std::unique_ptr<ir::Graph> graph_;
ir::Graph *graph_;
std::unique_ptr<::ThreadPool> pool_;
std::vector<Scope *> local_scopes_;
std::vector<platform::Place> places_;
......
......@@ -76,6 +76,9 @@ std::map<std::string, std::vector<ir::Node *>> Graph::InitFromProgram(
var->inputs.push_back(node);
}
}
Set<const std::vector<OpDesc *>>(
details::kStaleProgramOpDescs,
new std::vector<OpDesc *>(program.Block(0).AllOps()));
return var_nodes;
}
......
......@@ -31,7 +31,7 @@ namespace details {
// This attr is not recommended, because the graph should not dependence
// the program once it is built.
constexpr char kAllOpDescs[] = "all_op_descs";
constexpr char kStaleProgramOpDescs[] = "stale_program_op_descs";
} // namespace details
namespace ir {
......@@ -195,6 +195,12 @@ class Graph {
return nullptr;
}
// Returns reference to the original program.
// WARN: After a series of passes, the current graph can be quite
// different from OriginProgram. Caller shouldn't assume much from
// the returned OriginProgram.
const ProgramDesc &OriginProgram() const { return program_; }
// This method takes ownership of `node`.
ir::Node *AddNode(ir::Node *node) {
PADDLE_ENFORCE(node_set_.find(node) == node_set_.end());
......
......@@ -44,10 +44,14 @@ struct TestIsReachable {
using func = std::function<bool(const std::string&, const std::string&)>;
auto operator()(const std::unique_ptr<ir::Graph>& graph) -> func {
auto find_node = [](const std::unique_ptr<ir::Graph>& graph,
auto hash = [](const Node* node) -> std::string {
return node->Name() + std::to_string(node->id());
};
auto find_node = [&](const std::unique_ptr<ir::Graph>& graph,
const std::string& name) -> Node* {
for (auto& node : GraphTraits::DFS(*graph)) {
if (name == node.Name()) {
if (name == hash(&node)) {
return &node;
}
}
......@@ -55,13 +59,17 @@ struct TestIsReachable {
return nullptr;
};
return [&](std::string from, const std::string to) -> bool {
// update the from and to strings to hashed equivs in loop from graph traits
return [&](std::string from, std::string to) -> bool {
if (from == to) return true;
std::map<std::string, bool> visited;
for (auto& node : GraphTraits::DFS(*graph)) {
visited[node.Name()] = false;
auto hashed = hash(&node);
if (node.Name() == from) from = hashed;
if (node.Name() == to) to = hashed;
visited[hashed] = false;
}
visited[from] = true;
......@@ -72,15 +80,15 @@ struct TestIsReachable {
while (!queue.empty()) {
auto cur = find_node(graph, queue.front());
queue.pop_front();
if (cur == nullptr) return false;
for (auto n : cur->outputs) {
if (n->Name() == to) return true;
auto hashed_name = hash(n);
if (hashed_name == to) return true;
if (!visited[n->Name()]) {
visited[n->Name()] = true;
queue.push_back(n->Name());
if (!visited[hashed_name]) {
visited[hashed_name] = true;
queue.push_back(hashed_name);
}
}
}
......@@ -166,6 +174,28 @@ TEST(ConvElementwiseAddMKLDNNFusePass, ConvolutionAsYWithElementwiseAddRelu) {
RunPassAndAssert(&prog, "a", "relu", 1);
}
TEST(ConvElementwiseAddMKLDNNFusePass,
ConvolutionProjectionAsYWithElementwiseAddRelu) {
auto prog = BuildProgramDesc({"a", "b", "c", "d", "e", "f"},
{"bias", "weights", "bias2", "weights2"});
SetOp(&prog, "sigmoid", {{"X", "a"}}, {"Out", "b"});
// right branch
SetOp(&prog, "conv2d",
{{"Input", "b"}, {"Bias", "bias"}, {"Filter", "weights"}},
{"Output", "c"});
// left branch
SetOp(&prog, "conv2d",
{{"Input", "a"}, {"Bias", "bias2"}, {"Filter", "weights2"}},
{"Output", "f"});
SetOp(&prog, "elementwise_add", {{"X", "f"}, {"Y", "c"}}, {"Out", "d"});
SetOp(&prog, "relu", {{"X", "d"}}, {"Out", "e"});
RunPassAndAssert(&prog, "a", "relu", 2);
}
TEST(ConvElementwiseAddMKLDNNFusePass,
ConvolutionAsYWithElementwiseAddReluNoBias) {
auto prog = BuildProgramDesc({"a", "b", "c", "d", "e"}, {"weights"});
......
......@@ -184,9 +184,10 @@ std::vector<Scope *> &ParallelExecutor::GetLocalScopes() {
ParallelExecutor::ParallelExecutor(
const std::vector<platform::Place> &places,
const std::unordered_set<std::string> &bcast_vars,
const ProgramDesc &main_program, const std::string &loss_var_name,
Scope *scope, const std::vector<Scope *> &local_scopes,
const ExecutionStrategy &exec_strategy, const BuildStrategy &build_strategy)
const std::string &loss_var_name, Scope *scope,
const std::vector<Scope *> &local_scopes,
const ExecutionStrategy &exec_strategy, const BuildStrategy &build_strategy,
ir::Graph *graph)
: member_(new ParallelExecutorPrivate(places)) {
member_->global_scope_ = scope;
member_->use_cuda_ = exec_strategy.use_cuda_;
......@@ -216,11 +217,13 @@ ParallelExecutor::ParallelExecutor(
}
}
std::unique_ptr<ir::Graph> temp_owned_graph(graph);
// FIXME(Yancey1989): parallel graph mode get better performance
// in GPU allreduce distributed training. Need an elegant way to
// choice the execution strategy.
build_strategy.enable_parallel_graph_ =
EnableParallelGraphExecution(main_program, exec_strategy, build_strategy);
build_strategy.enable_parallel_graph_ = EnableParallelGraphExecution(
*temp_owned_graph, exec_strategy, build_strategy);
if (build_strategy.enable_parallel_graph_)
VLOG(0) << "The Executor would execute the graph by ParallelGraph "
"Execution which can get better performance,"
......@@ -254,26 +257,32 @@ ParallelExecutor::ParallelExecutor(
if (member_->local_scopes_.size() != 1 && local_scopes.empty()) {
BCastParamsToDevices(bcast_vars);
}
// Startup Program has been run. All local scopes has correct parameters.
// Startup Program has been run. All local scopes has correct parameters.
// Step 2. Convert main_program to SSA form and dependency graph. Also, insert
// ncclOp
std::unique_ptr<ir::Graph> graph;
// Step 2. Convert main_program to SSA form and dependency graph. Also, insert
// ncclOp
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
graph = build_strategy.Apply(main_program, member_->places_, loss_var_name,
member_->local_scopes_, member_->nranks_,
member_->use_cuda_, member_->nccl_ctxs_.get());
temp_owned_graph = build_strategy.Apply(
std::move(temp_owned_graph), member_->places_, loss_var_name,
member_->local_scopes_, member_->nranks_, member_->use_cuda_,
member_->nccl_ctxs_.get());
#else
graph = build_strategy.Apply(main_program, member_->places_, loss_var_name,
member_->local_scopes_, member_->nranks_,
member_->use_cuda_);
temp_owned_graph = build_strategy.Apply(
std::move(temp_owned_graph), member_->places_, loss_var_name,
member_->local_scopes_, member_->nranks_, member_->use_cuda_);
#endif
auto max_memory_size = GetEagerDeletionThreshold();
VLOG(10) << "Eager Deletion Threshold "
<< static_cast<float>(max_memory_size) / (1 << 30);
if (max_memory_size >= 0) {
graph = member_->PrepareGCAndRefCnts(std::move(graph),
static_cast<size_t>(max_memory_size));
graph = member_
->PrepareGCAndRefCnts(std::move(temp_owned_graph),
static_cast<size_t>(max_memory_size))
.release();
} else {
graph = temp_owned_graph.release();
}
// Step 3. Create vars in each scope. Passes may also create new vars.
......@@ -308,8 +317,7 @@ ParallelExecutor::ParallelExecutor(
// TODO(Yancey1989): Remove passing in the main_program when
// allreduce_seq_pass doesn't need it as the attr.
member_->executor_.reset(new details::ParallelSSAGraphExecutor(
exec_strategy, member_->local_scopes_, member_->places_, main_program,
std::move(graph)));
exec_strategy, member_->local_scopes_, member_->places_, graph));
#else
PADDLE_THROW(
"Paddle should be compiled with CUDA for ParallelGraph Execution.");
......@@ -317,12 +325,10 @@ ParallelExecutor::ParallelExecutor(
} else {
if (exec_strategy.type_ == ExecutionStrategy::kDefault) {
member_->executor_.reset(new details::ThreadedSSAGraphExecutor(
exec_strategy, member_->local_scopes_, member_->places_,
std::move(graph)));
exec_strategy, member_->local_scopes_, member_->places_, graph));
} else {
member_->executor_.reset(new details::FastThreadedSSAGraphExecutor(
exec_strategy, member_->local_scopes_, member_->places_,
std::move(graph)));
exec_strategy, member_->local_scopes_, member_->places_, graph));
}
}
......@@ -452,26 +458,35 @@ void ParallelExecutor::FeedAndSplitTensorIntoLocalScopes(
}
}
ParallelExecutor::~ParallelExecutor() {
for (auto &p : member_->places_) {
platform::DeviceContextPool::Instance().Get(p)->Wait();
}
delete member_;
}
bool ParallelExecutor::EnableParallelGraphExecution(
const ProgramDesc &main_program, const ExecutionStrategy &exec_strategy,
const ir::Graph &graph, const ExecutionStrategy &exec_strategy,
const BuildStrategy &build_strategy) const {
if (!FLAGS_enable_parallel_graph) return false;
bool enable_parallel_graph = true;
for (ir::Node *node : graph.Nodes()) {
if (node->IsVar() && node->Var()) {
// TODO(Yancey1989): support sparse update in ParallelGraph mode.
for (auto &var_desc : main_program.Block(0).AllVars()) {
if (var_desc->GetType() == proto::VarType::SELECTED_ROWS) {
if (node->Var()->GetType() == proto::VarType::SELECTED_ROWS) {
enable_parallel_graph = false;
break;
}
}
} else if (node->IsOp() && node->Op()) {
// TODO(Yancey1989): support pserver mode
for (auto &op_desc : main_program.Block(0).AllOps()) {
if (op_desc->Type() == "send" || op_desc->Type() == "recv") {
if (node->Op()->Type() == "send" || node->Op()->Type() == "recv") {
enable_parallel_graph = false;
break;
}
}
}
if (!member_->use_all_reduce_ || !member_->use_cuda_)
......@@ -481,13 +496,6 @@ bool ParallelExecutor::EnableParallelGraphExecution(
return enable_parallel_graph;
}
ParallelExecutor::~ParallelExecutor() {
for (auto &p : member_->places_) {
platform::DeviceContextPool::Instance().Get(p)->Wait();
}
delete member_;
}
} // namespace framework
} // namespace paddle
......
......@@ -46,11 +46,11 @@ class ParallelExecutor {
public:
explicit ParallelExecutor(const std::vector<platform::Place> &places,
const std::unordered_set<std::string> &bcast_vars,
const ProgramDesc &main_program,
const std::string &loss_var_name, Scope *scope,
const std::vector<Scope *> &local_scopes,
const ExecutionStrategy &exec_strategy,
const BuildStrategy &build_strategy);
const BuildStrategy &build_strategy,
ir::Graph *graph);
~ParallelExecutor();
......@@ -71,7 +71,7 @@ class ParallelExecutor {
private:
void BCastParamsToDevices(const std::unordered_set<std::string> &vars) const;
bool EnableParallelGraphExecution(const ProgramDesc &main_program,
bool EnableParallelGraphExecution(const ir::Graph &graph,
const ExecutionStrategy &exec_strategy,
const BuildStrategy &build_strategy) const;
......
......@@ -114,23 +114,23 @@ class VarBase {
public:
VarBase() : VarBase(new framework::Variable(), new VarBase(true)) {}
// Owns `var` and `grad`
explicit VarBase(bool stop_gradient)
: VarBase(new framework::Variable(),
stop_gradient ? nullptr : new VarBase(true), stop_gradient) {}
VarBase(framework::Variable* var, VarBase* grad)
: VarBase(var, grad, false) {}
private:
VarBase(framework::Variable* var, VarBase* grad, bool stop_gradient)
: var_desc_(nullptr),
var_(var),
grads_(grad),
stop_gradient_(false),
pre_op_(nullptr),
pre_op_out_idx_(-1) {}
explicit VarBase(bool stop_gradient)
: var_desc_(nullptr),
var_(new framework::Variable()),
grads_(stop_gradient ? nullptr : new VarBase(true)),
stop_gradient_(stop_gradient),
pre_op_(nullptr),
pre_op_out_idx_(-1) {}
public:
virtual ~VarBase() {
if (var_) {
delete var_;
......@@ -141,11 +141,13 @@ class VarBase {
}
}
OpBase* PreOp() const { return pre_op_; }
int PreOpOutIdx() const { return pre_op_out_idx_; }
inline OpBase* PreOp() const { return pre_op_; }
inline int PreOpOutIdx() const { return pre_op_out_idx_; }
void SetStopGradient(bool stop_gradient) { stop_gradient_ = stop_gradient; }
bool IsStopGradient() const { return stop_gradient_; }
inline void SetStopGradient(bool stop_gradient) {
stop_gradient_ = stop_gradient;
}
inline bool IsStopGradient() const { return stop_gradient_; }
void RunBackward();
......
......@@ -14,6 +14,8 @@
#include "paddle/fluid/imperative/tracer.h"
#include <set>
#include "paddle/fluid/operators/math/math_function.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/enforce.h"
......@@ -66,8 +68,9 @@ platform::Place GetExpectedPlace(platform::Place place, VarBasePtrMap inputs) {
return result;
}
void Tracer::Trace(OpBase* op, const VarBasePtrMap& inputs,
const VarBasePtrMap& outputs, framework::BlockDesc* block,
std::set<std::string> Tracer::Trace(OpBase* op, const VarBasePtrMap& inputs,
const VarBasePtrMap& outputs,
framework::BlockDesc* block,
const platform::Place expected_place,
const bool stop_gradient) {
std::map<std::string, VarBase*> vars;
......@@ -76,6 +79,7 @@ void Tracer::Trace(OpBase* op, const VarBasePtrMap& inputs,
VLOG(3) << "tracer tracing " << op_desc->Type();
op_desc->InferShape(*block);
op_desc->InferVarType(block);
std::unique_ptr<framework::OperatorBase> op_base =
framework::OpRegistry::CreateOp(*op_desc);
......@@ -92,7 +96,7 @@ void Tracer::Trace(OpBase* op, const VarBasePtrMap& inputs,
invars.emplace_back(inp->var_);
vars[inp->var_desc_->Name()] = inp;
if (inp->PreOp()) {
if (inp->PreOp() && !inp->IsStopGradient()) {
op->pre_ops_[it.first].push_back(inp->PreOp());
op->pre_ops_out_idx_[it.first].push_back(inp->PreOpOutIdx());
} else {
......@@ -142,6 +146,8 @@ void Tracer::Trace(OpBase* op, const VarBasePtrMap& inputs,
framework::ExecutionContext(prepared_op.op, scope, *prepared_op.dev_ctx,
prepared_op.ctx, prepared_op.kernel_configs));
std::set<std::string> vars_saved_for_backward;
if (!stop_gradient) {
std::unique_ptr<std::unordered_map<std::string, std::string>> grad_to_var(
new std::unordered_map<std::string, std::string>());
......@@ -161,6 +167,7 @@ void Tracer::Trace(OpBase* op, const VarBasePtrMap& inputs,
PADDLE_ENFORCE(fwd_var_it != vars.end());
// Forward inputs or outputs.
grad_in_vars.push_back(fwd_var_it->second->var_);
vars_saved_for_backward.insert(it.first);
} else {
VarBase* var = vars[var_it->second];
if (!var->grads_->var_->IsInitialized()) {
......@@ -194,6 +201,7 @@ void Tracer::Trace(OpBase* op, const VarBasePtrMap& inputs,
}
op->block_ = block;
return vars_saved_for_backward;
}
std::vector<VarBase*> Tracer::PyTrace(OpBase* op,
......@@ -203,7 +211,7 @@ std::vector<VarBase*> Tracer::PyTrace(OpBase* op,
op->input_vars_[PyLayer::kFwdInp] = inputs;
op->output_vars_[PyLayer::kFwdOut] = PyLayer::Apply(op->forward_id_, inputs);
for (VarBase* inp : inputs) {
if (inp->PreOp()) {
if (inp->PreOp() && !inp->IsStopGradient()) {
op->pre_ops_[PyLayer::kFwdInp].push_back(inp->PreOp());
op->pre_ops_out_idx_[PyLayer::kFwdInp].push_back(inp->PreOpOutIdx());
} else {
......
......@@ -15,6 +15,7 @@
#pragma once
#include <map>
#include <set>
#include <string>
#include <vector>
......@@ -43,8 +44,9 @@ class Tracer {
virtual ~Tracer() {}
void Trace(OpBase* op, const VarBasePtrMap& inputs,
const VarBasePtrMap& outputs, framework::BlockDesc* block,
std::set<std::string> Trace(OpBase* op, const VarBasePtrMap& inputs,
const VarBasePtrMap& outputs,
framework::BlockDesc* block,
const platform::Place expected_place,
const bool stop_gradient = false);
......
// Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <vector>
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/operator.h"
#include "paddle/fluid/framework/var_type.h"
#include "paddle/fluid/operators/math/math_function.h"
namespace paddle {
namespace operators {
static framework::proto::VarType::Type kDefaultDtype =
framework::proto::VarType::Type::VarType_Type_BOOL;
template <typename DeviceContext, typename T>
class AllocContinuousSpaceKernel : public framework::OpKernel<T> {
public:
void Compute(const framework::ExecutionContext &context) const override {
auto &in_var_names = context.Inputs("Input");
auto &out_var_names = context.Outputs("Output");
auto &in_vars = context.MultiInputVar("Input");
auto out_vars = context.MultiOutputVar("Output");
PADDLE_ENFORCE_GT(in_var_names.size(), static_cast<size_t>(0));
PADDLE_ENFORCE_EQ(in_var_names.size(), out_var_names.size());
for (size_t i = 0; i < in_var_names.size(); ++i) {
// Only support LoDTensor
PADDLE_ENFORCE_NOT_NULL(in_vars[i], "%s should not be nullptr,",
in_var_names[i]);
PADDLE_ENFORCE_NOT_NULL(out_vars[i], "%s should not be nullptr,",
out_var_names[i]);
PADDLE_ENFORCE(in_vars[i]->IsType<framework::LoDTensor>());
PADDLE_ENFORCE(out_vars[i]->IsType<framework::LoDTensor>());
}
auto in_tensors = context.MultiInput<framework::LoDTensor>("Input");
if (context.Attr<bool>("check_name")) {
for (size_t i = 0; i < in_var_names.size(); ++i) {
PADDLE_ENFORCE_EQ(in_var_names[i], out_var_names[i]);
}
} else {
// Init the output as input
for (size_t i = 0; i < in_tensors.size(); ++i) {
out_vars[i]->GetMutable<framework::LoDTensor>()->Resize(
in_tensors[i]->dims());
}
}
auto &dev_ctx = context.template device_context<DeviceContext>();
// Get numel and dtype
size_t numel = 0;
auto dtype = kDefaultDtype;
GetMemSizeAndDtype(in_tensors, in_var_names, &numel, &dtype);
// Alloc the continuous space
auto fused_tensor = context.Output<framework::LoDTensor>("FusedOutput");
fused_tensor->Resize(framework::make_ddim({static_cast<int64_t>(numel)}))
.mutable_data(context.GetPlace(), dtype);
// Init the continuous space
auto out_tensors = context.MultiOutput<framework::LoDTensor>("Output");
int64_t offset = 0;
if (context.Attr<bool>("copy_data")) {
for (size_t i = 0; i < in_var_names.size(); ++i) {
int64_t len = out_tensors[i]->numel();
auto sub_tensor = fused_tensor->Slice(offset, offset + len);
offset += len;
framework::TensorCopy(*out_tensors[i], context.GetPlace(), dev_ctx,
&sub_tensor);
}
} else if (context.Attr<bool>("set_constant")) {
math::SetConstant<DeviceContext, T> set_constant;
set_constant(dev_ctx, fused_tensor,
static_cast<T>(context.Attr<float>("constant")));
}
// Make the outputs point to the continuous space.
offset = 0;
for (size_t i = 0; i < out_tensors.size(); ++i) {
int64_t len = out_tensors[i]->numel();
auto dim = out_tensors[i]->dims();
out_tensors[i]
->ShareDataWith(fused_tensor->Slice(offset, offset + len))
.Resize(dim);
offset += len;
VLOG(10) << "alloc_space_for_vars: output(" << out_var_names[i]
<< ") ,dim:(" << dim << ")"
<< " Address: " << out_tensors[i]->data<void>();
}
}
void GetMemSizeAndDtype(
const std::vector<const framework::LoDTensor *> &lod_tensors,
const std::vector<std::string> var_names, size_t *numel,
framework::proto::VarType::Type *dtype) const {
PADDLE_ENFORCE_EQ(lod_tensors.size(), var_names.size());
*numel = 0;
for (size_t i = 0; i < var_names.size(); ++i) {
PADDLE_ENFORCE(lod_tensors[i]->IsInitialized(), "%s is not initialized.",
var_names[i]);
auto p_dtype = lod_tensors[i]->type();
if (*dtype == kDefaultDtype) {
PADDLE_ENFORCE_NE(p_dtype, kDefaultDtype, "%s's type should not be %s.",
var_names[i], kDefaultDtype);
*dtype = p_dtype;
}
PADDLE_ENFORCE_EQ(p_dtype, *dtype, "Input vars is not equal.");
auto size = lod_tensors[i]->numel();
PADDLE_ENFORCE_GT(size, 0);
VLOG(10) << "alloc_space_for_vars: input(" << var_names[i] << ") ,dim:("
<< lod_tensors[i]->dims() << ")";
*numel += size;
}
}
};
class AllocContinuousSpaceOp : public framework::OperatorWithKernel {
public:
using framework::OperatorWithKernel::OperatorWithKernel;
void InferShape(framework::InferShapeContext *ctx) const override {}
};
class AllocContinuousSpaceOpMaker : public framework::OpProtoAndCheckerMaker {
public:
void Make() override {
AddInput("Input",
"(vector<LoDTensor>) The input tensors of"
" alloc_continuous_space operator.")
.AsDuplicable();
AddOutput("Output",
"(vector<LoDTensor>) The output "
"tensors of alloc_continuous_space operator. And the address "
"of output tensors are continuous, they are sliced from the "
"tensor of FusedOutput.")
.AsDuplicable();
AddOutput("FusedOutput",
"(LoDTensor) The output tensor "
"of alloc_continuous_space operator. And the tensors of"
" Output is sliced from the tensor of FusedOutput.");
AddAttr<bool>("copy_data", "Whether to copy the Input value to Output.")
.SetDefault(false);
AddAttr<bool>("set_constant",
"Whether to set the Output with a constant value.")
.SetDefault(false);
AddAttr<float>("constant",
"If set_constant is true, the constant value will be used "
"to set the Output.")
.SetDefault(0.0);
AddAttr<bool>("check_name",
"Whether to check the name of Input and Output to ensure "
"they are the same separately.")
.SetDefault(false);
AddComment(R"DOC(
AllocContinuousSpace Operator.
alloc_continuous_space is used to make the address of Output
continuous according to the Input. This Op will alloc a big tensor
according to the tensors of Input, the dtype is the same with those input tensors,
the size is the sum of those input tensors' numel, and the dim of the big
tensor is {sum(numel)}. And the big tensor is stored in FusedOutput.
The tensors of Output are sliced from the tensor of FusedOutput.
Note that, the dtype of Input should be the same, and the dim of Input
and Output should equal.
The tensors of Input and Output could be the same or different. And
alloc_continuous_space allows copying the value of Input to Output, or
setting the Output with a constant value.
)DOC");
}
};
} // namespace operators
} // namespace paddle
REGISTER_OPERATOR(alloc_continuous_space,
paddle::operators::AllocContinuousSpaceOp,
paddle::operators::AllocContinuousSpaceOpMaker);
namespace ops = paddle::operators;
REGISTER_OP_CPU_KERNEL(
alloc_continuous_space,
ops::AllocContinuousSpaceKernel<paddle::platform::CPUDeviceContext, int>,
ops::AllocContinuousSpaceKernel<paddle::platform::CPUDeviceContext, float>,
ops::AllocContinuousSpaceKernel<paddle::platform::CPUDeviceContext,
double>);
#ifdef PADDLE_WITH_CUDA
REGISTER_OP_CUDA_KERNEL(
alloc_continuous_space,
ops::AllocContinuousSpaceKernel<paddle::platform::CUDADeviceContext, int>,
ops::AllocContinuousSpaceKernel<paddle::platform::CUDADeviceContext, float>,
ops::AllocContinuousSpaceKernel<paddle::platform::CUDADeviceContext,
double>);
#endif
......@@ -172,6 +172,10 @@ class PriorBoxOpKernel : public framework::OpKernel<T> {
framework::make_ddim({1, static_cast<int>(variances.size())}),
ctx.GetPlace());
auto var_et = framework::EigenTensor<T, 2>::From(var_t);
#ifdef PADDLE_WITH_MKLML
#pragma omp parallel for
#endif
for (size_t i = 0; i < variances.size(); ++i) {
var_et(0, i) = variances[i];
}
......@@ -181,8 +185,15 @@ class PriorBoxOpKernel : public framework::OpKernel<T> {
vars->Resize({box_num, static_cast<int>(variances.size())});
auto e_vars = framework::EigenMatrix<T, Eigen::RowMajor>::From(*vars);
e_vars = var_et.broadcast(Eigen::DSizes<int, 2>(box_num, 1));
#ifdef PADDLE_WITH_MKLML
#pragma omp parallel for collapse(2)
#endif
for (int i = 0; i < box_num; ++i) {
for (int j = 0; j < variances.size(); ++j) {
e_vars(i, j) = variances[j];
}
}
vars->Resize(var_dim);
}
}; // namespace operators
......
......@@ -79,15 +79,6 @@ class SumMKLDNNOpKernel : public paddle::framework::OpKernel<T> {
memory::format input_format = input0.format();
if (src_tz.size() == 1 && (input_format == memory::format::nchw ||
input_format == memory::format::nhwc)) {
input_format = memory::format::x;
}
if (src_tz.size() == 2 && (input_format == memory::format::nchw ||
input_format == memory::format::nhwc)) {
input_format = memory::format::nc;
}
for (int i = 0; i < N; i++) {
PADDLE_ENFORCE(in_vars[i]->IsType<LoDTensor>(),
"all inputs must be all LoDTensors");
......@@ -147,105 +138,10 @@ class SumMKLDNNOpKernel : public paddle::framework::OpKernel<T> {
output->set_layout(DataLayout::kMKLDNN);
output->set_format(output_format);
} else if (out_var->IsType<framework::SelectedRows>()) {
// TODO(@mozga-intel) Add MKLDNN SelectedRows support
std::unique_ptr<framework::SelectedRows> in0;
if (in_place) {
// If is in_place, we store the input[0] to in0
auto& in_sel0 = in_vars[0]->Get<SelectedRows>();
auto& rows = in_sel0.rows();
in0.reset(new framework::SelectedRows(rows, in_sel0.height()));
in0->mutable_value()->ShareDataWith(in_sel0.value());
}
auto get_selected_row = [&](size_t i) -> const SelectedRows& {
if (i == 0 && in0) {
return *in0;
} else {
return in_vars[i]->Get<SelectedRows>();
}
};
auto* out = ctx.Output<SelectedRows>("Out");
out->mutable_rows()->clear();
auto* out_value = out->mutable_value();
// Runtime InferShape
size_t first_dim = 0;
for (int i = 0; i < N; i++) {
auto& sel_row = get_selected_row(i);
first_dim += sel_row.rows().size();
}
std::vector<int64_t> in_dim;
for (int i = 0; i < N; i++) {
auto& sel_row = get_selected_row(i);
if (sel_row.rows().size() > 0) {
in_dim = framework::vectorize(sel_row.value().dims());
break;
}
}
if (in_dim.empty()) {
VLOG(3) << "WARNING: all the inputs are empty";
in_dim = framework::vectorize(get_selected_row(N - 1).value().dims());
} else {
in_dim[0] = static_cast<int64_t>(first_dim);
}
in_dim[0] = static_cast<int64_t>(first_dim);
out_value->Resize(framework::make_ddim(in_dim));
out_value->mutable_data<T>(ctx.GetPlace());
// if all the input sparse vars are empty, no need to
// merge these vars.
if (first_dim == 0UL) {
return;
}
math::SelectedRowsAddTo<CPUDeviceContext, T> functor;
int64_t offset = 0;
for (int i = 0; i < N; i++) {
auto& sel_row = get_selected_row(i);
if (sel_row.rows().size() == 0) {
continue;
}
PADDLE_ENFORCE_EQ(out->height(), sel_row.height());
functor(ctx.template device_context<CPUDeviceContext>(), sel_row,
offset, out);
offset += sel_row.value().numel();
}
} else if (out_var->IsType<framework::LoDTensorArray>()) {
// TODO(@mozga-intel) Add MKLDNN LoDTensorArray support
auto& out_array = *out_var->GetMutable<framework::LoDTensorArray>();
for (size_t i = in_place ? 1 : 0; i < in_vars.size(); ++i) {
PADDLE_ENFORCE(in_vars[i]->IsType<framework::LoDTensorArray>(),
"Only support all inputs are TensorArray");
auto& in_array = in_vars[i]->Get<framework::LoDTensorArray>();
for (size_t i = 0; i < in_array.size(); ++i) {
if (in_array[i].numel() != 0) {
if (i >= out_array.size()) {
out_array.resize(i + 1);
}
if (out_array[i].numel() == 0) {
framework::TensorCopy(in_array[i], in_array[i].place(),
ctx.device_context(), &out_array[i]);
out_array[i].set_lod(in_array[i].lod());
} else {
PADDLE_ENFORCE(out_array[i].lod() == in_array[i].lod());
auto in = EigenVector<T>::Flatten(in_array[i]);
auto result = EigenVector<T>::Flatten(out_array[i]);
result.device(*ctx.template device_context<MKLDNNDeviceContext>()
.eigen_device()) = result + in;
}
}
}
}
} else {
PADDLE_THROW("Unexpected branch, output variable type is %s",
framework::ToTypeName(out_var->Type()));
} else { // Fallback to naive version
// TODO(@mozga-intel) Add MKLDNN SelectedRows & LoDTensorArray support
SumKernel<CPUDeviceContext, T> reference_kernel;
reference_kernel.Compute(ctx);
}
}
};
......
......@@ -14,6 +14,9 @@ limitations under the License. */
#pragma once
#include <string>
#ifdef PADDLE_WITH_CUDA
#include <cuda_runtime.h>
#endif
namespace paddle {
namespace platform {
......
......@@ -34,7 +34,7 @@ void BindTracer(pybind11::module* m) {
framework::BlockDesc* block,
const platform::CPUPlace expected_place,
const bool stop_gradient = false) {
self.Trace(op, inputs, outputs, block, expected_place,
return self.Trace(op, inputs, outputs, block, expected_place,
stop_gradient);
})
.def("trace",
......@@ -44,7 +44,7 @@ void BindTracer(pybind11::module* m) {
framework::BlockDesc* block,
const platform::CUDAPlace expected_place,
const bool stop_gradient = false) {
self.Trace(op, inputs, outputs, block, expected_place,
return self.Trace(op, inputs, outputs, block, expected_place,
stop_gradient);
})
.def("py_trace", &imperative::Tracer::PyTrace,
......
......@@ -101,7 +101,8 @@ void BindGraph(py::module *m) {
[](Graph &self, Node &node) { return self.RemoveNode(&node); })
.def("retrieve_node", &Graph::RetrieveNode,
return_value_policy::reference)
.def("resolve_hazard", &Graph::ResolveHazard);
.def("resolve_hazard", &Graph::ResolveHazard)
.def("origin_program_desc", &Graph::OriginProgram);
}
void BindNode(py::module *m) {
......
......@@ -189,6 +189,8 @@ void BindBlockDesc(pybind11::module *m) {
return self.HasVar(name);
},
pybind11::return_value_policy::reference)
.def("_clear_block", [](pd::BlockDesc &self) { return self.Clear(); },
pybind11::return_value_policy::reference)
.def("_rename_var",
[](pd::BlockDesc &self, const pybind11::bytes &byte_name,
const pybind11::bytes &byte_name_new) {
......
......@@ -551,9 +551,9 @@ All parameter, weight, gradient are variables in Paddle.
m, "LoDTensorBlockingQueue", "")
.def("push",
[](LoDTensorBlockingQueue &self,
std::vector<framework::LoDTensor> &lod_tensor_vec) {
const std::vector<framework::LoDTensor> &lod_tensor_vec) {
pybind11::gil_scoped_release release;
return self.Push(std::move(lod_tensor_vec));
return self.Push(lod_tensor_vec);
})
.def("size", &LoDTensorBlockingQueue::Size)
.def("capacity", &LoDTensorBlockingQueue::Cap)
......@@ -994,6 +994,7 @@ All parameter, weight, gradient are variables in Paddle.
[](ir::PassBuilder &self, size_t idx) { self.RemovePass(idx); });
// -- python binds for parallel executor.
py::class_<ParallelExecutor> pe(m, "ParallelExecutor");
py::class_<ExecutionStrategy> exec_strategy(pe, "ExecutionStrategy", R"DOC(
ExecutionStrategy allows the user to more preciously control how to run
......@@ -1231,9 +1232,9 @@ All parameter, weight, gradient are variables in Paddle.
cannot be updated after being finalized.)DOC");
pe.def(py::init<const std::vector<platform::Place> &,
const std::unordered_set<std::string> &, const ProgramDesc &,
const std::string &, Scope *, std::vector<Scope *> &,
const ExecutionStrategy &, const BuildStrategy &>())
const std::unordered_set<std::string> &, const std::string &,
Scope *, std::vector<Scope *> &, const ExecutionStrategy &,
const BuildStrategy &, ir::Graph *>())
// NOTE: even we return a vec<Scope*>* to Python use reference policy.
// We still cannot get local_scope from this vector, since the element
// of vec<Scope*> will be freed by Python GC. We can only return Scope*
......
......@@ -444,6 +444,7 @@ function assert_api_spec_approvals() {
"paddle/fluid/framework/ir/node.h"
"paddle/fluid/framework/ir/graph.h"
"paddle/fluid/framework/framework.proto"
"python/paddle/fluid/compiler.py"
"paddle/fluid/operators/distributed/send_recv.proto.in")
for API_FILE in ${API_FILES[*]}; do
API_CHANGE=`git diff --name-only upstream/$BRANCH | grep "${API_FILE}" || true`
......
......@@ -17,10 +17,10 @@ import os
import six
import sys
from .. import compat as cpt
from . import framework
from .framework import cuda_places, cpu_places
from . import core
from . import framework
__all__ = ['CompiledProgram', 'ExecutionStrategy', 'BuildStrategy']
......@@ -38,7 +38,7 @@ def _place_obj(place):
class CompiledProgram(object):
"""
Compiles a Program for execution.
Compiles to Graph for execution.
1. Users first create the program with layers.
2. Optionally, users use CompiledProgram to optimize the program before run.
......@@ -53,7 +53,7 @@ class CompiledProgram(object):
Example:
.. code-block:: python
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
place = fluid.CUDAPlace(0) if use_gpu else fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup)
compiled_prog = compiler.CompiledProgram(main).with_data_parallel(
......@@ -64,11 +64,25 @@ class CompiledProgram(object):
fetch_list=[loss.name])
Args:
program: Program instance that contains the model logic.
program_or_graph (Graph|Program): If it's Program, it will be first
lowered to a graph for further optimizations. If it's a graph
(potentially optimized before), it will be directly used for
further optimizations. Note: graph is only supported when compiled
with with_data_parallel option.
"""
def __init__(self, program):
self._program = program
def __init__(self, program_or_graph):
if isinstance(program_or_graph, core.Graph):
self._graph = program_or_graph
self._program = None
elif isinstance(program_or_graph, framework.Program):
self._graph = core.Graph(program_or_graph.desc)
self._program = program_or_graph
else:
raise ValueError("Wrong program_to_graph type: %s" %
type(program_or_graph))
self._program_desc = self._graph.origin_program_desc()
self._scope = None
self._place = None
self._executor = None
......@@ -110,6 +124,7 @@ class CompiledProgram(object):
self
"""
assert not self._is_data_parallel, "Already compiled with parallel."
assert not self._is_inference, "Cannot compile both data parallel and inference"
self._is_data_parallel = True
self._build_strategy = build_strategy
self._exec_strategy = exec_strategy
......@@ -137,11 +152,13 @@ class CompiledProgram(object):
Returns:
self
"""
assert not self._is_data_parallel, "Cannot compile both data parallel and inference"
assert not self._is_inference, "Already compiled with inference"
assert any([
isinstance(config, InferNativeConfig),
isinstance(config, InferAnalysisConfig)
])
self._is_data_parallel = False
self._is_inference = True
self._infer_config = config
return self
......@@ -185,37 +202,41 @@ class CompiledProgram(object):
else:
self._exec_strategy.num_threads = len(self._places) * 2
trainers_endpoints = self._program._trainers_endpoints
# FIXME(dzhwinter): enable_inplace should be after memory_optimize
# if turn on python memory optimize, turn off the inplace_pass.
if self._build_strategy.memory_optimize is None:
self._build_strategy.memory_optimize = False if self._program._is_mem_optimized else True
self._build_strategy.memory_optimize = False if self._program and self._program._is_mem_optimized else True
if self._build_strategy.enable_inplace is None:
self._build_strategy.enable_inplace = False if self._program._is_mem_optimized else True
self._build_strategy.enable_inplace = False if self._program and self._program._is_mem_optimized else True
# TODO(wuyi): trainer endpoings should be passed in through
# build_strategy, not program.xxx.
if self._program and self._build_strategy.num_trainers > 1 and \
self._program._trainers_endpoints:
tps = self._program._trainers_endpoints
if self._build_strategy.num_trainers > 1 and trainers_endpoints:
assert self._build_strategy.num_trainers == len(
trainers_endpoints), "num_trainers == len(end_points)"
self._build_strategy.trainers_endpoints = trainers_endpoints
self._persistable_vars = set([
cpt.to_text(v.name)
for v in [
var for var in self._program.list_vars()
if var.persistable and var.type != core.VarDesc.VarType.RAW
]
tps), "num_trainers == len(end_points)"
self._build_strategy.trainers_endpoints = tps
self._persistable_vars = []
for block_id in range(self._program_desc.num_blocks()):
bdesc = self._program_desc.block(block_id)
self._persistable_vars.extend([
cpt.to_text(v.name()) for v in bdesc.all_vars()
if v.persistable() and v.type() != core.VarDesc.VarType.RAW
])
places = list(map(_place_obj, self._places))
return core.ParallelExecutor(
places, self._persistable_vars, self._program.desc,
places,
set(self._persistable_vars),
cpt.to_text(self._loss_name)
if self._loss_name else six.u(''), self._scope, self._local_scopes,
self._exec_strategy, self._build_strategy)
self._exec_strategy, self._build_strategy, self._graph)
def _compile_inference(self):
assert self._is_data_parallel is False
return core.create_paddle_predictor(self._infer_config)
def _compile(self, scope, place):
......
......@@ -538,6 +538,8 @@ class Executor(object):
else:
# TODO(panyx0718): Can compile program to optimize executor
# performance.
# TODO(panyx0718): executor should be able to run graph.
assert program._program, "CompiledProgram is compiled from graph, can only run with_data_parallel."
return self._run(
program._program,
self._default_executor,
......
......@@ -472,16 +472,19 @@ class Variable(object):
# get_capacity is implemented
pass
self.block.vars[name] = self
self.op = None
self.stop_gradient = stop_gradient
self.is_data = is_data
if _in_imperative_mode():
# record vars in tracer rather than blocks
self._ivar = kwargs.get("ivar", None)
if not self._ivar:
self._ivar = core.VarBase()
self._ivar = core.VarBase(stop_gradient)
self._ivar.desc = self.desc
self._ivar.stop_gradient = stop_gradient
if persistable:
self.block.vars[name] = self
else:
self.block.vars[name] = self
self.op = None
self.stop_gradient = stop_gradient
self.is_data = is_data
def _numpy(self):
new_ivar = self._ivar._copy_to(core.CPUPlace(), True)
......@@ -824,6 +827,7 @@ class Operator(object):
if _in_imperative_mode():
self.iop = core.OpBase()
self.iop.desc = self.desc
self.inputs = defaultdict(list)
if inputs is not None:
for k, v in six.iteritems(inputs):
......@@ -831,6 +835,7 @@ class Operator(object):
self.inputs[k].append(v._ivar)
elif isinstance(v, list) or isinstance(v, tuple):
self.inputs[k].extend([var._ivar for var in v])
self.outputs = defaultdict(list)
if outputs is not None:
for k, v in six.iteritems(outputs):
......@@ -1280,6 +1285,15 @@ class Block(object):
else:
raise ValueError("Var {0} is not found recursively".format(name))
def _clear_block(self):
# TODO(minqiyang): move this to backward_hooks
self.desc._clear_block()
for name in self.vars.keys():
assert self.vars[name].persistable
del self.ops[:]
def all_parameters(self):
return list(self.iter_parameters())
......@@ -1410,18 +1424,31 @@ class Block(object):
inputs=kwargs.get("inputs", None),
outputs=kwargs.get("outputs", None),
attrs=kwargs.get("attrs", None))
self.ops.append(op)
# TODO(minqiyang): add stop_gradient support in static mode too.
if _in_imperative_mode():
# record ops in tracer rather than blocks
#
# TODO(minqiyang): add op stop_gradient support in static mode too.
# currently, we only support stop_gradient in imperative mode.
self._trace_op(op, kwargs.get("stop_gradient", False))
self.ops.append(op)
return op
def _trace_op(self, op, stop_gradient=False):
if _in_imperative_mode():
_imperative_tracer().trace(op.iop, op.inputs, op.outputs, self.desc,
_imperative_current_expected_place_,
stop_gradient)
backward_refs = _imperative_tracer().trace(
op.iop, op.inputs, op.outputs, self.desc,
_imperative_current_expected_place_, stop_gradient)
# TODO(minqiyang): support backward_hooks to eager remove backward_refs
op.backward_refs = defaultdict(list)
for k, v in six.iteritems(op.inputs):
if k in backward_refs:
op.backward_refs[k] = op.inputs[k]
for k, v in six.iteritems(op.outputs):
if k in backward_refs:
op.backward_refs[k] = op.outputs[k]
def _insert_op(self, index, *args, **kwargs):
"""
......@@ -1476,6 +1503,7 @@ class Block(object):
outputs=kwargs.get("outputs", None),
attrs=kwargs.get("attrs", None))
self.ops.insert(0, op)
if _in_imperative_mode():
self._trace_op(op, kwargs.get("stop_gradient", False))
return op
......
......@@ -176,10 +176,13 @@ class ParallelExecutor(object):
places = list(map(place_obj, self._places))
# step7: init ParallelExecutor
# ParallelExecutor API will be deprecated, don't support parallel graph.
self._graph = core.Graph(main.desc)
self.executor = core.ParallelExecutor(
places, persistable_vars, main.desc,
places, persistable_vars,
cpt.to_text(loss_name) if loss_name else six.u(''), scope,
local_scopes, exec_strategy, build_strategy)
local_scopes, exec_strategy, build_strategy, self._graph)
self.scope = scope
......
......@@ -12,14 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import core
from . import core
import six
import threading
from .framework import Program, Variable, program_guard, default_main_program, default_startup_program
from .executor import global_scope
from .data_feeder import DataFeeder
from .layers.io import monkey_patch_reader_methods, _copy_reader_var_, double_buffer
import unique_name
from .unique_name import UniqueNameGenerator
__all__ = ['PyReader']
......@@ -40,7 +40,7 @@ def _convert_places(places):
class PyReader(object):
unique_name_generator = unique_name.UniqueNameGenerator()
unique_name_generator = UniqueNameGenerator()
def __init__(self,
feed_list,
......@@ -272,7 +272,7 @@ class PyReader(object):
Set the data source of the PyReader object.
The provided :code:`reader` should be a Python generator,
which yields numpy-typed batched data.
which yields list(numpy.ndarray) typed batched data.
:code:`places` must be set when the PyReader object is iterable.
......@@ -298,7 +298,7 @@ class PyReader(object):
Set the data source of the PyReader object.
The provided :code:`reader` should be a Python generator,
which yields LoDTensor-typed batched data.
which yields numpy.ndarray-typed or LoDTensor-typed batched data.
:code:`places` must be set when the PyReader object is iterable.
......
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import unittest
import numpy as np
from op_test import OpTest
class TestAllocContinuousSpace(OpTest):
def setUp(self):
self.op_type = "alloc_continuous_space"
self.dtype = np.float32
attrs = self.init_attr()
self.copy_data = attrs["copy_data"]
self.constant = attrs["constant"]
self.set_constant = attrs["set_constant"]
self.Inputs = self.init_input()
self.FusedOutput = self.init_output(self.Inputs, self.set_constant,
self.constant)
self.inputs = {'Input': self.Inputs}
self.attrs = attrs
self.outputs = {'Output': self.Inputs, 'FusedOutput': self.FusedOutput}
def init_dtype(self):
self.dtype = np.float32
def init_input(self):
inputs = []
inputs.append(("x1", np.random.random([20, 3]).astype(self.dtype)))
inputs.append(("x2", np.random.random([20]).astype(self.dtype)))
inputs.append(("x3", np.random.random([1]).astype(self.dtype)))
inputs.append(("x4", np.random.random([200, 30]).astype(self.dtype)))
inputs.append(("x5", np.random.random([30]).astype(self.dtype)))
inputs.append(("x6", np.random.random([1]).astype(self.dtype)))
return inputs
def init_attr(self):
return {"copy_data": True, "set_constant": False, "constant": 0.0}
def init_output(self, input_list, set_constant, constant):
inputs = [input[1].flatten() for input in input_list]
output = np.concatenate(inputs)
if set_constant:
output = np.ones((len(output))) * constant
return output
def test_check_output(self):
self.check_output()
class TestAllocContinuousSpace2(TestAllocContinuousSpace):
def init_attr(self):
return {"copy_data": False, "set_constant": True, "constant": 0.5}
def test_check_output(self):
self.check_output(no_check_set=["Output"])
if __name__ == '__main__':
unittest.main()
......@@ -105,7 +105,7 @@ class MNIST(fluid.imperative.Layer):
class TestImperativeMnist(unittest.TestCase):
def test_mnist_float32(self):
seed = 90
batch_num = 2
epoch_num = 1
with fluid.imperative.guard():
fluid.default_startup_program().random_seed = seed
fluid.default_main_program().random_seed = seed
......@@ -113,17 +113,16 @@ class TestImperativeMnist(unittest.TestCase):
mnist = MNIST("mnist")
sgd = SGDOptimizer(learning_rate=1e-3)
train_reader = paddle.batch(
paddle.dataset.mnist.train(), batch_size=128)
paddle.dataset.mnist.train(), batch_size=128, drop_last=True)
dy_param_init_value = {}
for epoch in range(epoch_num):
for batch_id, data in enumerate(train_reader()):
if batch_id >= batch_num:
break
dy_x_data = np.array(
[x[0].reshape(1, 28, 28) for x in data]).astype('float32')
y_data = np.array([x[1] for x in data]).astype('int64').reshape(
128, 1)
[x[0].reshape(1, 28, 28)
for x in data]).astype('float32')
y_data = np.array(
[x[1] for x in data]).astype('int64').reshape(128, 1)
img = to_variable(dy_x_data)
label = to_variable(y_data)
......@@ -132,19 +131,21 @@ class TestImperativeMnist(unittest.TestCase):
cost = mnist(img)
loss = fluid.layers.cross_entropy(cost, label)
avg_loss = fluid.layers.mean(loss)
dy_out = avg_loss._numpy()
if batch_id == 0:
for param in fluid.default_main_program().global_block(
).all_parameters():
if epoch == 0 and batch_id == 0:
for param in mnist.parameters():
dy_param_init_value[param.name] = param._numpy()
avg_loss._backward()
sgd.minimize(avg_loss)
mnist.clear_gradients()
fluid.default_main_program().global_block()._clear_block()
dy_param_value = {}
for param in fluid.default_main_program().global_block(
).all_parameters():
for param in mnist.parameters():
dy_param_value[param.name] = param._numpy()
with new_program_scope():
......@@ -157,7 +158,7 @@ class TestImperativeMnist(unittest.TestCase):
mnist = MNIST("mnist")
sgd = SGDOptimizer(learning_rate=1e-3)
train_reader = paddle.batch(
paddle.dataset.mnist.train(), batch_size=128)
paddle.dataset.mnist.train(), batch_size=128, drop_last=True)
img = fluid.layers.data(
name='pixel', shape=[1, 28, 28], dtype='float32')
......@@ -170,8 +171,7 @@ class TestImperativeMnist(unittest.TestCase):
# initialize params and fetch them
static_param_init_value = {}
static_param_name_list = []
for param in fluid.default_startup_program().global_block(
).all_parameters():
for param in mnist.parameters():
static_param_name_list.append(param.name)
out = exe.run(fluid.default_startup_program(),
......@@ -180,18 +180,18 @@ class TestImperativeMnist(unittest.TestCase):
for i in range(len(static_param_name_list)):
static_param_init_value[static_param_name_list[i]] = out[i]
for epoch in range(epoch_num):
for batch_id, data in enumerate(train_reader()):
if batch_id >= batch_num:
break
static_x_data = np.array(
[x[0].reshape(1, 28, 28) for x in data]).astype('float32')
y_data = np.array([x[1] for x in data]).astype('int64').reshape(
[128, 1])
[x[0].reshape(1, 28, 28)
for x in data]).astype('float32')
y_data = np.array(
[x[1] for x in data]).astype('int64').reshape([128, 1])
fetch_list = [avg_loss.name]
fetch_list.extend(static_param_name_list)
out = exe.run(fluid.default_main_program(),
out = exe.run(
fluid.default_main_program(),
feed={"pixel": static_x_data,
"label": y_data},
fetch_list=fetch_list)
......@@ -199,7 +199,10 @@ class TestImperativeMnist(unittest.TestCase):
static_param_value = {}
static_out = out[0]
for i in range(1, len(out)):
static_param_value[static_param_name_list[i - 1]] = out[i]
static_param_value[static_param_name_list[i - 1]] = out[
i]
self.assertTrue(np.allclose(dy_x_data.all(), static_x_data.all()))
for key, value in six.iteritems(static_param_init_value):
self.assertTrue(np.allclose(value, dy_param_init_value[key]))
......@@ -207,7 +210,7 @@ class TestImperativeMnist(unittest.TestCase):
self.assertTrue(np.allclose(static_out, dy_out))
for key, value in six.iteritems(static_param_value):
self.assertTrue(np.allclose(value, dy_param_value[key]))
self.assertTrue(np.allclose(value, dy_param_value[key], atol=1e-5))
if __name__ == '__main__':
......
......@@ -231,7 +231,7 @@ class TestImperativeResnet(unittest.TestCase):
seed = 90
batch_size = train_parameters["batch_size"]
batch_num = 1
batch_num = 2
with fluid.imperative.guard():
fluid.default_startup_program().random_seed = seed
fluid.default_main_program().random_seed = seed
......@@ -286,6 +286,8 @@ class TestImperativeResnet(unittest.TestCase):
optimizer.minimize(avg_loss)
resnet.clear_gradients()
fluid.default_main_program().global_block()._clear_block()
dy_param_value = {}
for param in resnet.parameters():
dy_param_value[param.name] = param._numpy()
......@@ -319,11 +321,9 @@ class TestImperativeResnet(unittest.TestCase):
static_param_init_value = {}
static_param_name_list = []
static_grad_name_list = []
for param in fluid.default_startup_program().global_block(
).all_parameters():
for param in resnet.parameters():
static_param_name_list.append(param.name)
for param in fluid.default_main_program().global_block(
).all_parameters():
for param in resnet.parameters():
if not param.stop_gradient:
static_grad_name_list.append(param.name +
core.grad_var_suffix())
......
......@@ -13,21 +13,47 @@
# limitations under the License.
import os
import sys
import unittest
from timeit import default_timer as timer
import paddle
import paddle.fluid as fluid
import paddle.fluid.core as core
import paddle.dataset.wmt16 as wmt16
os.environ['FLAGS_eager_delete_tensor_gb'] = "0.0"
os.environ[
'RECORDIO_FILENAME'] = '/tmp/ir_memory_optimize_transformer.wmt16.recordio'
from test_parallel_executor_transformer import TestTransformer
from test_parallel_executor_transformer import transformer
from test_parallel_executor_transformer import transformer, ModelHyperParams, transformer_model, transformer, prepare_batch_input
from parallel_executor_test_base import TestParallelExecutorBase
# disable temporarily because of timeout.
sys.exit(0)
# NOTE(dzhwinter): test diferent strategy colisions.
# open the eager delete tensor strategy by default.
class TestTransformerWithIR(TestTransformer):
class TestTransformerWithIR(TestParallelExecutorBase):
@classmethod
def setUpClass(cls):
os.environ['CPU_NUM'] = str(4)
reader = paddle.batch(
wmt16.train(ModelHyperParams.src_vocab_size,
ModelHyperParams.trg_vocab_size),
batch_size=transformer_model.batch_size)
with fluid.recordio_writer.create_recordio_writer(
os.environ.get("RECORDIO_FILENAME")) as writer:
for batch in reader():
for tensor in prepare_batch_input(
batch, ModelHyperParams.src_pad_idx,
ModelHyperParams.trg_pad_idx, ModelHyperParams.n_head):
t = fluid.LoDTensor()
t.set(tensor, fluid.CPUPlace())
writer.append_tensor(t)
writer.complete_append_tensor()
def test_main(self):
if core.is_compiled_with_cuda():
# check python transpiler
......@@ -35,13 +61,15 @@ class TestTransformerWithIR(TestTransformer):
transformer,
use_cuda=True,
memory_opt=True,
use_ir_memory_optimize=False)
use_ir_memory_optimize=False,
iter=2)
# check IR memory optimize
self.check_network_convergence(
transformer,
use_cuda=True,
memory_opt=False,
use_ir_memory_optimize=True)
use_ir_memory_optimize=True,
iter=2)
if __name__ == '__main__':
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册