diff --git a/doc/fluid/design/ir/draft.md b/doc/fluid/design/ir/draft.md index a141dcbca584c6064c8da863410692a8be911d12..a33b5a9c9312c93247a1e1f3431061a5aad6c884 100644 --- a/doc/fluid/design/ir/draft.md +++ b/doc/fluid/design/ir/draft.md @@ -1,16 +1,16 @@ ## Motivation -There is a ```gap``` between the ```Program``` defined by -user and the ```Executable``` that can be scheduled +There is a `gap` between the `Program` defined by +user and the `Executable` that can be scheduled efficiently on heterogeneous hardware, either locally or distributedly. -Usually, the ```gap``` is bridged by +Usually, the `gap` is bridged by * A serious transformations with defined order. * These transformations usually involve -```insert, delete, clustering, split, dependency analysis```. +`insert, delete, clustering, split, dependency analysis`. * Has a simple way to verify and debug each transformation. @@ -38,44 +38,44 @@ design below. #### Node -```Node``` represents an operation that performs some computation or +`Node` represents an operation that performs some computation or a variable that is input or output of operation. -```Node```s are connected to other ```Node```s via inputs and outputs. +`Node`s are connected to other `Node`s via inputs and outputs. Other properties (maybe device placement information) can be added -to ```Node``` in the future if it's a -common requirement of many other ```Pass```es. Otherwise, it should live -in a ```Node``` wrapper class that is private to some ```Pass``` or be -a local member of a ```Pass```. +to `Node` in the future if it's a +common requirement of many other `Pass`es. Otherwise, it should live +in a `Node` wrapper class that is private to some `Pass` or be +a local member of a `Pass`. #### Graph -```Graph``` contains a list of ```Node```s, which are connected to +`Graph` contains a list of `Node`s, which are connected to each other via inputs and outputs. TODO: Better definitions for the graph. -```Graph``` can also contain ```Attribute```s. ```Attribute```s -can be ``any`` thing. For example, it can be a list of "wraper" -nodes. The ```wrapper``` nodes compose ```Node```s and provide -helper method for execution or transformation. ```Attribute``` +`Graph` can also contain `Attribute`s. `Attribute`s +can be `any` thing. For example, it can be a list of "wraper" +nodes. The `wrapper` nodes compose `Node`s and provide +helper method for execution or transformation. `Attribute` can also contain other things that describe some properties of -the ```Graph``` or ```Graph``` nodes. ```Attribute``` can be passed -across ```Pass```. However, it should be used with care. +the `Graph` or `Graph` nodes. `Attribute` can be passed +across `Pass`. However, it should be used with care. #### Pass -```Pass``` represents a transformation of ```Graph```. Its input -is a ```Graph``` and its output is also a ```Graph```. For example, -a ```Pass``` can simply print out the ```Graph```. A ```Pass``` -can also fuse some ```Graph```'s ```Node```s. +`Pass` represents a transformation of `Graph`. Its input +is a `Graph` and its output is also a `Graph`. For example, +a `Pass` can simply print out the `Graph`. A `Pass` +can also fuse some `Graph`'s `Node`s. #### Optimize -```Optimize``` contains a series of ```Pass``` with defined order. -```Optimize``` transforms a ```Graph``` that only contains raw -modeling logic to a ```Graph``` that can be run efficiently while +`Optimize` contains a series of `Pass` with defined order. +`Optimize` transforms a `Graph` that only contains raw +modeling logic to a `Graph` that can be run efficiently while maintaining the original modeling logic. diff --git a/paddle/fluid/framework/details/CMakeLists.txt b/paddle/fluid/framework/details/CMakeLists.txt index df55b3d05402f1aeecfd8d4218a637a81d58ed87..9df7df1f42886d40210b16aa2ae5823e3310bfe7 100644 --- a/paddle/fluid/framework/details/CMakeLists.txt +++ b/paddle/fluid/framework/details/CMakeLists.txt @@ -1,11 +1,11 @@ -cc_library(var_handle SRCS var_handle.cc DEPS place framework_proto) +cc_library(var_handle SRCS var_handle.cc DEPS place framework_proto node) cc_library(op_handle_base SRCS op_handle_base.cc DEPS var_handle device_context lod_tensor) cc_library(scale_loss_grad_op_handle SRCS scale_loss_grad_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory) cc_library(fetch_op_handle SRCS fetch_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory) cc_library(computation_op_handle SRCS computation_op_handle.cc DEPS framework_proto scope place operator op_registry) cc_library(rpc_op_handle SRCS rpc_op_handle.cc DEPS framework_proto scope place operator op_registry) -cc_library(ssa_graph_builder SRCS ssa_graph_builder.cc DEPS graph) +cc_library(ssa_graph_builder SRCS ssa_graph_builder.cc DEPS graph graph_helper) cc_library(ssa_graph_printer SRCS ssa_graph_printer.cc DEPS ssa_graph_builder) cc_library(ssa_graph_checker SRCS ssa_graph_checker.cc DEPS ssa_graph_builder) diff --git a/paddle/fluid/framework/details/multi_devices_graph_builder.cc b/paddle/fluid/framework/details/multi_devices_graph_builder.cc index f1f8674caf663ce38df5a2eecbcf690b5ca87dc4..22f0cb20d01cc5b40325ec37a8c7cd44105bc6c6 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_builder.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_builder.cc @@ -25,6 +25,7 @@ #include "paddle/fluid/framework/details/reduce_op_handle.h" #include "paddle/fluid/framework/details/rpc_op_handle.h" #include "paddle/fluid/framework/details/scale_loss_grad_op_handle.h" +#include "paddle/fluid/framework/ir/graph_helper.h" #include "paddle/fluid/framework/ir/node.h" #include "paddle/fluid/framework/op_info.h" #include "paddle/fluid/framework/scope.h" @@ -67,7 +68,8 @@ MultiDevSSAGraphBuilder::MultiDevSSAGraphBuilder( } } -void MultiDevSSAGraphBuilder::CreateOpHandleIOs(Graph *result, ir::Node *node, +void MultiDevSSAGraphBuilder::CreateOpHandleIOs(ir::Graph *result, + ir::Node *node, size_t place_id) const { auto p = places_[place_id]; auto *op_handle = result->Get("ops").back().get(); @@ -92,12 +94,11 @@ void MultiDevSSAGraphBuilder::CreateOpHandleIOs(Graph *result, ir::Node *node, } std::vector MultiDevSSAGraphBuilder::FindDistTrainSendVars( - const std::vector> &nodes) const { + const std::vector &nodes) const { std::vector send_vars; // since parameters are all in block 0, // it's enough to only scan send ops in block 0 for (auto &node : nodes) { - if (node->NodeType() != ir::Node::Type::kOperation) continue; OpDesc *op = node->Op(); // TODO(Yancey1989): use a graceful method to find send op, // instead of the the hard code string @@ -112,10 +113,9 @@ std::vector MultiDevSSAGraphBuilder::FindDistTrainSendVars( } std::vector MultiDevSSAGraphBuilder::FindDistTrainRecvVars( - const std::vector> &nodes) const { + const std::vector &nodes) const { std::vector recv_vars; for (auto &node : nodes) { - if (node->NodeType() != ir::Node::Type::kOperation) continue; OpDesc *op = node->Op(); // TODO(Yancey1989): use a graceful method to find recv op, // instead of the hard code string @@ -170,6 +170,7 @@ size_t MultiDevSSAGraphBuilder::GetAppropriateDeviceID( const std::vector &var_names) const { int64_t numel_sum = 0; for (auto var_name : var_names) { + if (all_vars_.find(var_name) == all_vars_.end()) continue; auto var_desc = all_vars_.at(var_name); PADDLE_ENFORCE_NOT_NULL(var_desc); auto dim = framework::make_ddim(var_desc->GetShape()); @@ -186,19 +187,70 @@ size_t MultiDevSSAGraphBuilder::GetAppropriateDeviceID( return dev_id; } -std::unique_ptr MultiDevSSAGraphBuilder::Apply( - std::unique_ptr graph) const { - // Rebuild the graph structure. - auto nodes = std::move(graph->nodes); - graph->nodes.clear(); +// Topology sort the graph nodes from inputs to outputs. +// Since SSAGraphBuilder depends on forward/backward nodes to assign devices +// to parameter/gradients before optimizer ops, topo sort is insufficient. ( +// some optimizer ops might not depend on any nodes), we manually move all +// optimizer nodes after last backward nodes. +// However, the assumption by SSAGraphBuilder should be relaxed in the future. +std::vector SortOpsAndDelayOptimizeOp(const ir::Graph &graph) { + std::vector ret = ir::TopologySortOperations(graph); + size_t last_backward = 0; + for (size_t i = 0; i < ret.size(); ++i) { + if (boost::get( + ret[i]->Op()->GetAttr(OpProtoAndCheckerMaker::OpRoleAttrName())) == + static_cast(OpRole::kBackward)) { + last_backward = i; + } + } + + std::vector optimize_ops; + std::vector sorted_ret; + for (size_t i = 0; i < ret.size(); ++i) { + if (i < last_backward) { + if (boost::get(ret[i]->Op()->GetAttr( + OpProtoAndCheckerMaker::OpRoleAttrName())) == + static_cast(OpRole::kOptimize)) { + optimize_ops.push_back(ret[i]); + } else { + sorted_ret.push_back(ret[i]); + } + } else if (i == last_backward) { + sorted_ret.push_back(ret[i]); + // Verify that no operations before optimize ops depends on optimize ops. + std::unordered_set optimize_set(optimize_ops.begin(), + optimize_ops.end()); + for (ir::Node *n : sorted_ret) { + for (ir::Node *in : n->inputs) { + for (ir::Node *pre_n : in->inputs) { + PADDLE_ENFORCE(optimize_set.find(pre_n) == optimize_set.end(), + "optimize operations cannot be depended by forward " + "or backward node %s -> %s", + pre_n->Name(), n->Name()); + } + } + } + sorted_ret.insert(sorted_ret.end(), optimize_ops.begin(), + optimize_ops.end()); + } else { + sorted_ret.push_back(ret[i]); + } + } + return sorted_ret; +} + +std::unique_ptr MultiDevSSAGraphBuilder::Apply( + std::unique_ptr graph) const { + // Give the topology sort order and rebuild the graph structure. + std::vector sorted_ops = SortOpsAndDelayOptimizeOp(*graph); + auto nodes = graph->ReleaseNodes(); + ir::Graph &result = *graph; for (auto &node : nodes) { - if (node->NodeType() == ir::Node::Type::kVariable) { + if (node->NodeType() == ir::Node::Type::kVariable && node->Var()) { all_vars_.emplace(node->Name(), node->Var()); } } - - Graph &result = *graph; std::unordered_set og_has_been_broadcast; // We cannot invoke resize. It is a bug of GCC 4.8 @@ -208,8 +260,8 @@ std::unique_ptr MultiDevSSAGraphBuilder::Apply( // find send/recv vars so that we can place the distributed training // realted op in the place 0 - auto send_vars = FindDistTrainSendVars(nodes); - auto recv_vars = FindDistTrainRecvVars(nodes); + auto send_vars = FindDistTrainSendVars(sorted_ops); + auto recv_vars = FindDistTrainRecvVars(sorted_ops); std::vector> bcast_var_name_set; bcast_var_name_set.resize(places_.size()); @@ -217,22 +269,18 @@ std::unique_ptr MultiDevSSAGraphBuilder::Apply( size_t cur_device_id = 0; bool is_forwarding = true; - // NOTE: Currently, passes before SSAGraphBuilder cannot reorder - // forward, backward nodes. E.g. you can't append an forward node - // at the end of the node list. - // TODO(panyx0718): FIXME: Needs to sort by forward->backward order. - for (auto &node : nodes) { - if (node->NodeType() != ir::Node::Type::kOperation) continue; + for (ir::Node *node : sorted_ops) { if (boost::get( node->Op()->GetAttr(OpProtoAndCheckerMaker::OpRoleAttrName())) == static_cast(OpRole::kRPC)) { - CreateRPCOp(&result, node.get()); - } else if (IsDistTrainOp(node.get(), send_vars, recv_vars)) { - CreateDistTrainOp(&result, node.get()); - } else if (IsScaleLossOp(node.get())) { + CreateRPCOp(&result, node); + } else if (IsDistTrainOp(node, send_vars, recv_vars)) { + CreateDistTrainOp(&result, node); + } else if (IsScaleLossOp(node)) { // user can customize loss@grad if not use_default_grad_scale_ if (strategy_.gradient_scale_ != BuildStrategy::GradientScaleStrategy::kCustomized) { + // TODO(paddle-dev): Why is there no input for this op_handle? CreateScaleLossGradOp(&result); } // This assumes the backward generating code will ensure IsScaleLossOp @@ -241,24 +289,23 @@ std::unique_ptr MultiDevSSAGraphBuilder::Apply( // the block. is_forwarding = false; } else { - int op_dev_id = GetOpDeviceID(node.get()); + int op_dev_id = GetOpDeviceID(node); if (op_dev_id != -1) { // This op only runs on one specific device. - CreateComputationalOp(&result, node.get(), op_dev_id); + CreateComputationalOp(&result, node, op_dev_id); for (ir::Node *n : node->outputs) { var_name_on_devices_.emplace(n->Name(), op_dev_id); } } else { // This op runs on all devices, and its output may have parameter's // gradients. + // TODO(paddle-dev): Why is so special about "read" op? if (node->Op()->Type() == "read" && strategy_.enable_data_balance_) { node->Op()->SetAttr("throw_eof_exp", false); - CreateComputationalOps(&result, node.get(), places_.size()); - // TODO(paddle-dev): builder shouldn't depend on the out logic of - // a specific op. + CreateComputationalOps(&result, node, places_.size()); const auto &data_var_names = node->Op()->Output("Out"); InsertDataBalanceOp(&result, data_var_names); } else { - CreateComputationalOps(&result, node.get(), places_.size()); + CreateComputationalOps(&result, node, places_.size()); } if (!is_forwarding && places_.size() > 1) { @@ -322,17 +369,17 @@ std::unique_ptr MultiDevSSAGraphBuilder::Apply( } } } - /* - Dependency graph has been constructed. However, there are still data - hazards need to be handled. - */ + Dependency graph has been constructed. However, there are still data + hazards need to be handled. + */ PolishGraphToSupportDataHazards(&result); /* * Only variables should be the leaves of graph. */ AddOutputToLeafOps(&result); + PADDLE_ENFORCE(!ir::HasCircle(result)); return graph; } @@ -357,7 +404,7 @@ void MultiDevSSAGraphBuilder::SetCommunicationContext( #endif } -void MultiDevSSAGraphBuilder::CreateBroadcastOp(Graph *result, +void MultiDevSSAGraphBuilder::CreateBroadcastOp(ir::Graph *result, const std::string &p_name, size_t src_dev_id) const { #ifdef PADDLE_WITH_CUDA @@ -387,7 +434,7 @@ void MultiDevSSAGraphBuilder::CreateBroadcastOp(Graph *result, } } -void MultiDevSSAGraphBuilder::CreateComputationalOp(Graph *result, +void MultiDevSSAGraphBuilder::CreateComputationalOp(ir::Graph *result, ir::Node *node, int dev_id) const { result->Get("ops").emplace_back( @@ -396,7 +443,7 @@ void MultiDevSSAGraphBuilder::CreateComputationalOp(Graph *result, CreateOpHandleIOs(result, node, dev_id); } -void MultiDevSSAGraphBuilder::InsertAllReduceOp(Graph *result, +void MultiDevSSAGraphBuilder::InsertAllReduceOp(ir::Graph *result, const std::string &og) const { #ifdef PADDLE_WITH_CUDA result->Get("ops").emplace_back(new AllReduceOpHandle( @@ -426,7 +473,7 @@ void MultiDevSSAGraphBuilder::InsertAllReduceOp(Graph *result, } void MultiDevSSAGraphBuilder::InsertDataBalanceOp( - Graph *result, const std::vector &datas) const { + ir::Graph *result, const std::vector &datas) const { #ifdef PADDLE_WITH_CUDA result->Get("ops").emplace_back(new DataBalanceOpHandle( result->CreateEmptyNode("data_balance", ir::Node::Type::kOperation), @@ -479,8 +526,8 @@ int MultiDevSSAGraphBuilder::GetOpDeviceID(ir::Node *node) const { PADDLE_ENFORCE_EQ(param_grad.size(), 2U); int dev_id = GetVarDeviceID(param_grad[1]); - PADDLE_ENFORCE_NE(dev_id, -1, "dev_id should not be -1.[%s, %s]", - node->Op()->Type(), param_grad[0]); + PADDLE_ENFORCE_NE(dev_id, -1, "dev_id should not be -1.[%s, %s, %s]", + node->Op()->Type(), param_grad[0], param_grad[1]); return dev_id; } @@ -489,7 +536,7 @@ int MultiDevSSAGraphBuilder::GetVarDeviceID(const std::string &varname) const { return got == var_name_on_devices_.end() ? -1 : got->second; } -void MultiDevSSAGraphBuilder::CreateScaleLossGradOp(Graph *result) const { +void MultiDevSSAGraphBuilder::CreateScaleLossGradOp(ir::Graph *result) const { for (size_t i = 0; i < places_.size(); ++i) { // Insert ScaleCost OpHandle #ifdef PADDLE_WITH_CUDA @@ -519,7 +566,7 @@ void MultiDevSSAGraphBuilder::CreateScaleLossGradOp(Graph *result) const { } } -void MultiDevSSAGraphBuilder::CreateComputationalOps(Graph *result, +void MultiDevSSAGraphBuilder::CreateComputationalOps(ir::Graph *result, ir::Node *node, size_t num_places) const { for (size_t scope_idx = 0; scope_idx < num_places; ++scope_idx) { @@ -531,7 +578,7 @@ void MultiDevSSAGraphBuilder::CreateComputationalOps(Graph *result, } } -VarHandle *MultiDevSSAGraphBuilder::CreateReduceOp(Graph *result, +VarHandle *MultiDevSSAGraphBuilder::CreateReduceOp(ir::Graph *result, const std::string &og, int dst_dev_id) const { #ifdef PADDLE_WITH_CUDA @@ -564,12 +611,11 @@ VarHandle *MultiDevSSAGraphBuilder::CreateReduceOp(Graph *result, // Find the first occurence of `prev_op_name` and make current `op` depend // on it. -void MultiDevSSAGraphBuilder::ConnectOp(Graph *result, OpHandleBase *op, +void MultiDevSSAGraphBuilder::ConnectOp(ir::Graph *result, OpHandleBase *op, const std::string &prev_op_name) const { for (auto &prev_op : result->Get("ops")) { if (prev_op->Name() == prev_op_name) { - auto *dep_var = new DummyVarHandle( - result->CreateEmptyNode("dummy", ir::Node::Type::kVariable)); + auto *dep_var = new DummyVarHandle(result->CreateControlDepVar()); prev_op->AddOutput(dep_var); result->Get("dep_vars").emplace(dep_var); op->AddInput(dep_var); @@ -577,7 +623,7 @@ void MultiDevSSAGraphBuilder::ConnectOp(Graph *result, OpHandleBase *op, } } -void MultiDevSSAGraphBuilder::CreateDistTrainOp(Graph *result, +void MultiDevSSAGraphBuilder::CreateDistTrainOp(ir::Graph *result, ir::Node *node) const { int op_dev_id = -1; std::vector input_var_names; @@ -591,6 +637,7 @@ void MultiDevSSAGraphBuilder::CreateDistTrainOp(Graph *result, if (node->Op()->Type() == "split_byref" || node->Op()->Type() == "split_selected_rows") { + // TODO(paddle-dev): getting the first var is not safe. op_dev_id = GetVarDeviceID(input_var_names[0]); if (strategy_.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce) { op_dev_id = GetAppropriateDeviceID(input_var_names); @@ -624,10 +671,14 @@ void MultiDevSSAGraphBuilder::CreateDistTrainOp(Graph *result, } // Create RPC related op handles that connects its in ops and out ops. -void MultiDevSSAGraphBuilder::CreateRPCOp(Graph *result, ir::Node *node) const { +void MultiDevSSAGraphBuilder::CreateRPCOp(ir::Graph *result, + ir::Node *node) const { int op_dev_id = -1; if (node->Op()->Type() == "send") { + // TODO(paddle-dev): getting the first var is not safe. op_dev_id = GetVarDeviceID(node->inputs[0]->Name()); + PADDLE_ENFORCE(!ir::IsControlDepVar(*node->inputs[0]), + "This hack no longer holds, please fix."); // the variable name which contains .block means it was splited by // split_byref op // so that we can balance the variable blocks to all the pserver diff --git a/paddle/fluid/framework/details/multi_devices_graph_builder.h b/paddle/fluid/framework/details/multi_devices_graph_builder.h index 2b7f4f586b4e750fde9245286c977258a9db6086..55076f227b5ab56d66b5053173c9e915da23b15f 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_builder.h +++ b/paddle/fluid/framework/details/multi_devices_graph_builder.h @@ -46,11 +46,13 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder { const std::vector &local_scopes, const BuildStrategy &strategy); #endif - std::unique_ptr Apply(std::unique_ptr graph) const override; + std::unique_ptr Apply( + std::unique_ptr graph) const override; int GetVarDeviceID(const std::string &varname) const override; private: - void CreateOpHandleIOs(Graph *result, ir::Node *node, size_t device_id) const; + void CreateOpHandleIOs(ir::Graph *result, ir::Node *node, + size_t device_id) const; private: std::string loss_var_name_; @@ -64,8 +66,8 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder { bool IsScaleLossOp(ir::Node *node) const; - void CreateRPCOp(Graph *result, ir::Node *node) const; - void CreateDistTrainOp(Graph *result, ir::Node *node) const; + void CreateRPCOp(ir::Graph *result, ir::Node *node) const; + void CreateDistTrainOp(ir::Graph *result, ir::Node *node) const; /** * Is this operator as the end-point operator before/after send operator. @@ -74,21 +76,22 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder { const std::vector &recv_vars) const; std::vector FindDistTrainSendVars( - const std::vector> &nodes) const; + const std::vector &nodes) const; std::vector FindDistTrainRecvVars( - const std::vector> &nodes) const; + const std::vector &nodes) const; - void ConnectOp(Graph *result, OpHandleBase *op, + void ConnectOp(ir::Graph *result, OpHandleBase *op, const std::string &prev_op_name) const; - void CreateComputationalOps(Graph *result, ir::Node *node, + void CreateComputationalOps(ir::Graph *result, ir::Node *node, size_t num_places) const; - void CreateScaleLossGradOp(Graph *result) const; - VarHandle *CreateReduceOp(Graph *result, const std::string &og, + void CreateScaleLossGradOp(ir::Graph *result) const; + VarHandle *CreateReduceOp(ir::Graph *result, const std::string &og, int dst_dev_id) const; - void CreateComputationalOp(Graph *result, ir::Node *node, int dev_id) const; + void CreateComputationalOp(ir::Graph *result, ir::Node *node, + int dev_id) const; bool IsParameterGradientOnce( const std::string &og, @@ -96,12 +99,12 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder { int GetOpDeviceID(ir::Node *node) const; - void InsertAllReduceOp(Graph *result, const std::string &og) const; + void InsertAllReduceOp(ir::Graph *result, const std::string &og) const; - void InsertDataBalanceOp(Graph *result, + void InsertDataBalanceOp(ir::Graph *result, const std::vector &datas) const; - void CreateBroadcastOp(Graph *result, const std::string &p_name, + void CreateBroadcastOp(ir::Graph *result, const std::string &p_name, size_t src_dev_id) const; bool IsSparseGradient(const std::string &og) const; diff --git a/paddle/fluid/framework/details/rpc_op_handle.cc b/paddle/fluid/framework/details/rpc_op_handle.cc index 924ff4d118a192a43e5828a38fd1abbaac1a8526..f44b374edb29228dff5a8bf003d945291f166d49 100644 --- a/paddle/fluid/framework/details/rpc_op_handle.cc +++ b/paddle/fluid/framework/details/rpc_op_handle.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "paddle/fluid/framework/details/rpc_op_handle.h" +#include "paddle/fluid/framework/ir/graph.h" namespace paddle { namespace framework { @@ -33,7 +34,7 @@ void RPCOpHandle::RunImpl() { for (auto *in : inputs_) { auto &p = static_cast(in)->place_; // FIXME(Yancey1989): need a better solution instead of use DebugString() - if (in->DebugString() == "dummy") { // HACK + if (ir::IsControlDepVar(*in->Node())) { // HACK continue; } if (in->GeneratedOp()) { diff --git a/paddle/fluid/framework/details/ssa_graph_builder.cc b/paddle/fluid/framework/details/ssa_graph_builder.cc index 7bc130ef6e8d2e0caf6e445d12950b87e6dd4dbd..506e7eb35cd977869424223cb863dd64dbaa9d30 100644 --- a/paddle/fluid/framework/details/ssa_graph_builder.cc +++ b/paddle/fluid/framework/details/ssa_graph_builder.cc @@ -17,7 +17,7 @@ namespace paddle { namespace framework { namespace details { -void SSAGraphBuilder::PolishGraphToSupportDataHazards(Graph *graph) { +void SSAGraphBuilder::PolishGraphToSupportDataHazards(ir::Graph *graph) { for (auto &var_map : graph->Get("vars")) { for (auto &name_pair : var_map) { if (name_pair.second.size() <= 1) { @@ -36,9 +36,18 @@ void SSAGraphBuilder::PolishGraphToSupportDataHazards(Graph *graph) { // Read Write is the same op. continue; } + bool has_dep = false; + for (auto *r_out : read_op->Outputs()) { + for (auto *w_in : write_op->Inputs()) { + if (r_out->Node() == w_in->Node()) { + has_dep = true; + break; + } + } + } + if (has_dep) continue; - auto *dep_var = new DummyVarHandle( - graph->CreateEmptyNode("dummy", ir::Node::Type::kVariable)); + auto *dep_var = new DummyVarHandle(graph->CreateControlDepVar()); read_op->AddOutput(dep_var); write_op->AddInput(dep_var); graph->Get("dep_vars").emplace(dep_var); @@ -49,7 +58,7 @@ void SSAGraphBuilder::PolishGraphToSupportDataHazards(Graph *graph) { } VarHandle *SSAGraphBuilder::CreateOrGetLatestVarHandle( - Graph *graph, ir::Node *node, const platform::Place &place, + ir::Graph *graph, ir::Node *node, const platform::Place &place, size_t place_offset) { auto &var_holders = graph->Get("vars")[place_offset]; auto &var_holder = var_holders[node->Name()]; @@ -70,7 +79,7 @@ VarHandle *SSAGraphBuilder::CreateOrGetLatestVarHandle( return var; } -void SSAGraphBuilder::CreateOpOutput(Graph *graph, OpHandleBase *op_handle, +void SSAGraphBuilder::CreateOpOutput(ir::Graph *graph, OpHandleBase *op_handle, ir::Node *new_node, const platform::Place &place, size_t place_offset) { @@ -82,13 +91,12 @@ void SSAGraphBuilder::CreateOpOutput(Graph *graph, OpHandleBase *op_handle, op_handle->AddOutput(var); } -void SSAGraphBuilder::AddOutputToLeafOps(Graph *graph) { +void SSAGraphBuilder::AddOutputToLeafOps(ir::Graph *graph) { for (auto &op : graph->Get("ops")) { if (!op->Outputs().empty()) { continue; } - auto *dummy_leaf = new DummyVarHandle( - graph->CreateEmptyNode("dummy", ir::Node::Type::kVariable)); + auto *dummy_leaf = new DummyVarHandle(graph->CreateControlDepVar()); graph->Get("dep_vars").emplace(dummy_leaf); op->AddOutput(dummy_leaf); } diff --git a/paddle/fluid/framework/details/ssa_graph_builder.h b/paddle/fluid/framework/details/ssa_graph_builder.h index e8e8acdb38f893302fb92c47d6f1cb2d38453e0f..2b4f31f2ff3444f909e3be5eb810ae6737e237b2 100644 --- a/paddle/fluid/framework/details/ssa_graph_builder.h +++ b/paddle/fluid/framework/details/ssa_graph_builder.h @@ -57,26 +57,23 @@ class SSAGraphBuilder : public ir::Pass { DISABLE_COPY_AND_ASSIGN(SSAGraphBuilder); protected: - /** - * We only handle write after read(WAR), since it should not have a write - * after write in program. If there are write after write operators, we need - * prune them. - * - * https://en.wikipedia.org/wiki/Hazard_(computer_architecture)#Write_after_read_(WAR) - */ - static void PolishGraphToSupportDataHazards(Graph *graph); - - static VarHandle *CreateOrGetLatestVarHandle(Graph *graph, ir::Node *node, + /* + Dependency graph has been constructed. However, there are still data + hazards need to be handled. + */ + static void PolishGraphToSupportDataHazards(ir::Graph *graph); + + static VarHandle *CreateOrGetLatestVarHandle(ir::Graph *graph, ir::Node *node, const platform::Place &place, size_t place_offset); // Add an output variable (each_var_name, place, place_offset) to op_handle, // which belongs to graph - static void CreateOpOutput(Graph *graph, OpHandleBase *op_handle, + static void CreateOpOutput(ir::Graph *graph, OpHandleBase *op_handle, ir::Node *new_node, const platform::Place &place, size_t place_offset); - static void AddOutputToLeafOps(Graph *graph); + static void AddOutputToLeafOps(ir::Graph *graph); }; } // namespace details } // namespace framework diff --git a/paddle/fluid/framework/details/ssa_graph_checker.cc b/paddle/fluid/framework/details/ssa_graph_checker.cc index 7c79d7f1e881c67514634d56caa715c41927dbce..0438b096109a287366610d06ef2bd14c765a8f43 100644 --- a/paddle/fluid/framework/details/ssa_graph_checker.cc +++ b/paddle/fluid/framework/details/ssa_graph_checker.cc @@ -20,7 +20,7 @@ namespace paddle { namespace framework { namespace details { -bool SSAGraghBuilderWithChecker::IsValidGraph(const Graph *graph) const { +bool SSAGraghBuilderWithChecker::IsValidGraph(const ir::Graph *graph) const { std::unordered_map pending_ops; std::unordered_set pending_vars; std::unordered_set ready_vars; diff --git a/paddle/fluid/framework/details/ssa_graph_checker.h b/paddle/fluid/framework/details/ssa_graph_checker.h index 2e397e86825a41765a360d31fa8595d17027f3ec..51ce6e5ecad755613551aa6525b5cfbe4a8933ef 100644 --- a/paddle/fluid/framework/details/ssa_graph_checker.h +++ b/paddle/fluid/framework/details/ssa_graph_checker.h @@ -28,7 +28,8 @@ class SSAGraghBuilderWithChecker : public SSAGraphBuilder { std::unique_ptr&& builder) : builder_(std::move(builder)) {} - std::unique_ptr Apply(std::unique_ptr graph) const override { + std::unique_ptr Apply( + std::unique_ptr graph) const override { auto new_graph = builder_->Apply(std::move(graph)); PADDLE_ENFORCE(IsValidGraph(new_graph.get())); return new_graph; @@ -38,7 +39,7 @@ class SSAGraghBuilderWithChecker : public SSAGraphBuilder { return builder_->GetVarDeviceID(var_name); } - bool IsValidGraph(const Graph* graph) const; + bool IsValidGraph(const ir::Graph* graph) const; private: std::unique_ptr builder_; diff --git a/paddle/fluid/framework/details/ssa_graph_printer.cc b/paddle/fluid/framework/details/ssa_graph_printer.cc index 6dd6fd262e35a192ba85eb3aa16660526d2ebca2..20aab1464400aa9bb1bd6af11c06269c688a8308 100644 --- a/paddle/fluid/framework/details/ssa_graph_printer.cc +++ b/paddle/fluid/framework/details/ssa_graph_printer.cc @@ -21,7 +21,7 @@ namespace framework { namespace details { template -static inline void IterAllVar(const Graph &graph, Callback callback) { +static inline void IterAllVar(const ir::Graph &graph, Callback callback) { for (auto &each : graph.Get("vars")) { for (auto &pair1 : each) { for (auto &pair2 : pair1.second) { @@ -35,7 +35,7 @@ static inline void IterAllVar(const Graph &graph, Callback callback) { } } -void GraphvizSSAGraphPrinter::Print(const Graph &graph, +void GraphvizSSAGraphPrinter::Print(const ir::Graph &graph, std::ostream &sout) const { size_t var_id = 0; std::unordered_map vars; diff --git a/paddle/fluid/framework/details/ssa_graph_printer.h b/paddle/fluid/framework/details/ssa_graph_printer.h index cd72162f44ca76aa6340606cf79a73601eae89af..a77c1bad3f15bca9064ded860696eb68b033b090 100644 --- a/paddle/fluid/framework/details/ssa_graph_printer.h +++ b/paddle/fluid/framework/details/ssa_graph_printer.h @@ -25,12 +25,12 @@ namespace details { class SSAGraphPrinter { public: virtual ~SSAGraphPrinter() {} - virtual void Print(const Graph& graph, std::ostream& sout) const = 0; + virtual void Print(const ir::Graph& graph, std::ostream& sout) const = 0; }; class GraphvizSSAGraphPrinter : public SSAGraphPrinter { public: - void Print(const Graph& graph, std::ostream& sout) const override; + void Print(const ir::Graph& graph, std::ostream& sout) const override; }; class SSAGraghBuilderWithPrinter : public SSAGraphBuilder { @@ -50,7 +50,8 @@ class SSAGraghBuilderWithPrinter : public SSAGraphBuilder { stream_ptr_(std::move(sout)), stream_ref_(*stream_ptr_) {} - std::unique_ptr Apply(std::unique_ptr graph) const override { + std::unique_ptr Apply( + std::unique_ptr graph) const override { auto new_graph = builder_->Apply(std::move(graph)); printer_->Print(*new_graph, stream_ref_); return new_graph; diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc index f85c62dd6c4a8033a037b1e001ece6a9cc90ca98..c19f74476f9a1498a7d61f5faf204e9966aea155 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc @@ -21,7 +21,8 @@ namespace framework { namespace details { ThreadedSSAGraphExecutor::ThreadedSSAGraphExecutor( const ExecutionStrategy &strategy, const std::vector &local_scopes, - const std::vector &places, std::unique_ptr &&graph) + const std::vector &places, + std::unique_ptr &&graph) : graph_(std::move(graph)), pool_(strategy.num_threads_ >= 2 ? new ::ThreadPool(strategy.num_threads_) : nullptr), diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h index bf7c0a367a19ff4ac9462334516f1577672faa68..3d67daa45e20fdea52689684397ad01f2f4cd783 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h @@ -40,7 +40,7 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { ThreadedSSAGraphExecutor(const ExecutionStrategy &strategy, const std::vector &local_scopes, const std::vector &places, - std::unique_ptr &&graph); + std::unique_ptr &&graph); // Run a SSAGraph by a thread pool // Use topological sort algorithm @@ -53,7 +53,7 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { details::OpHandleBase *op); private: - std::unique_ptr graph_; + std::unique_ptr graph_; std::unique_ptr<::ThreadPool> pool_; std::vector local_scopes_; std::vector places_; diff --git a/paddle/fluid/framework/details/var_handle.cc b/paddle/fluid/framework/details/var_handle.cc index 6f00abd9473a84a77ed1a39015e2ae079e00be79..5457870e9ff5d7cf67c9c7076b9aae94eeada779 100644 --- a/paddle/fluid/framework/details/var_handle.cc +++ b/paddle/fluid/framework/details/var_handle.cc @@ -26,7 +26,7 @@ std::string VarHandle::DebugString() const { return ss.str(); } -std::string DummyVarHandle::DebugString() const { return "dummy"; } +std::string DummyVarHandle::DebugString() const { return node_->Name(); } } // namespace details } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/ir/CMakeLists.txt b/paddle/fluid/framework/ir/CMakeLists.txt index ee0604383ec9df826fa2abaef1f643ba0da6a096..6447452ae58344273fe569c91168c7c95a901c8d 100644 --- a/paddle/fluid/framework/ir/CMakeLists.txt +++ b/paddle/fluid/framework/ir/CMakeLists.txt @@ -1,5 +1,6 @@ cc_library(node SRCS node.cc DEPS proto_desc) cc_library(graph SRCS graph.cc DEPS node) +cc_library(graph_helper SRCS graph_helper.cc DEPS graph) cc_library(pass SRCS pass.cc DEPS graph node) - -cc_test(graph_test SRCS graph_test.cc DEPS graph proto_desc op_registry) +cc_test(graph_test SRCS graph_test.cc DEPS graph op_registry) +cc_test(graph_helper_test SRCS graph_helper_test.cc DEPS graph_helper op_registry) diff --git a/paddle/fluid/framework/ir/graph.cc b/paddle/fluid/framework/ir/graph.cc index e4021aa92b6da2343b604fb7bc01d31edb97d842..740acfafb7594d8d9f3ca5439323ce76c5ed271a 100644 --- a/paddle/fluid/framework/ir/graph.cc +++ b/paddle/fluid/framework/ir/graph.cc @@ -12,14 +12,18 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ +#include +#include + #include "paddle/fluid/framework/ir/graph.h" +#include "paddle/fluid/framework/op_proto_maker.h" #include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/var_desc.h" namespace paddle { namespace framework { +namespace ir { -// NOTE(paddle-dev): This graph contains circle. Graph::Graph(const ProgramDesc &program) : program_(program) { VLOG(3) << "block in program:" << program_.Size(); std::unordered_map all_vars; @@ -27,40 +31,87 @@ Graph::Graph(const ProgramDesc &program) : program_(program) { all_vars.emplace(var->Name(), var); } - std::map var_nodes; + std::map> var_nodes; for (auto *op : program.Block(0).AllOps()) { ir::Node *node = CreateOpNode(op); - + // For input args, reuse the same var name if it was created before. + // Otherwise, create a new one. for (auto &each_var_name : op->InputArgumentNames()) { ir::Node *var = nullptr; if (var_nodes.find(each_var_name) != var_nodes.end()) { - var = var_nodes.at(each_var_name); + var = var_nodes.at(each_var_name).back(); } else if (all_vars.count(each_var_name) != 0) { var = CreateVarNode(all_vars.at(each_var_name)); - var_nodes[each_var_name] = var; + var_nodes[each_var_name].push_back(var); } else { - // TODO(paddle-dev): Seems some assumption doesn't hold? - VLOG(3) << op->Type() - << " input var not in all_var list: " << each_var_name; + // Operation input var can be optional (dispensable). Which means + // the operation doesn't really need the var at runtime. In this + // case, the no-existed var is ready at the beginning. var = CreateEmptyNode(each_var_name, ir::Node::Type::kVariable); - var_nodes[each_var_name] = var; + var_nodes[each_var_name].push_back(var); } node->inputs.push_back(var); var->outputs.push_back(node); } - + // For output args, always create a new var. for (auto &each_var_name : op->OutputArgumentNames()) { - ir::Node *var = nullptr; - if (var_nodes.find(each_var_name) != var_nodes.end()) { - var = var_nodes.at(each_var_name); - } else { - var = CreateVarNode(all_vars.at(each_var_name)); - var_nodes[each_var_name] = var; - } + ir::Node *var = CreateVarNode(all_vars.at(each_var_name)); + var_nodes[each_var_name].push_back(var); node->outputs.push_back(var); var->inputs.push_back(node); } } + /** + * We only handle write after read(WAR), since it should not have a write + * after write in program. If there are write after write operators, we need + * prune them. + * + * https://en.wikipedia.org/wiki/Hazard_(computer_architecture)#Write_after_read_(WAR) + */ + + for (auto &var : var_nodes) { + auto &versions = var.second; + if (versions.size() <= 1) continue; + + auto it_new = versions.rbegin(); + auto it_old = versions.rbegin(); + ++it_old; + for (; it_old != versions.rend(); it_new = it_old, ++it_old) { + ir::Node *write_op = + (*it_new)->inputs.empty() ? nullptr : (*it_new)->inputs[0]; + const auto &read_ops = (*it_old)->outputs; + + for (auto *read_op : read_ops) { + // Manually add a dependency var from read_op to write_op; + if (read_op == write_op) { + // Read Write is the same op. + continue; + } + // 2 ops might have been connected via other vars. + bool has_dep = false; + for (ir::Node *r_out : read_op->outputs) { + for (ir::Node *w_in : write_op->inputs) { + if (r_out == w_in) { + has_dep = true; + break; + } + } + } + if (has_dep) continue; + + ir::Node *dep_var = CreateControlDepVar(); + read_op->outputs.push_back(dep_var); + dep_var->inputs.push_back(read_op); + write_op->inputs.push_back(dep_var); + dep_var->outputs.push_back(write_op); + } + } + } +} + +bool IsControlDepVar(const ir::Node &var) { + return var.Name().find(ir::Node::kControlDepVarName) != std::string::npos; } +} // namespace ir } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/ir/graph.h b/paddle/fluid/framework/ir/graph.h index b4ac135b029005b723abca2cb9b9a9aa175eda40..4f59ec82a7d1217621c95d9a4a433a9af43e95da 100644 --- a/paddle/fluid/framework/ir/graph.h +++ b/paddle/fluid/framework/ir/graph.h @@ -26,13 +26,14 @@ limitations under the License. */ namespace paddle { namespace framework { +namespace ir { class Graph { public: - explicit Graph(const ProgramDesc& program); + explicit Graph(const ProgramDesc &program); virtual ~Graph() { - for (auto& attr : attrs_) { + for (auto &attr : attrs_) { attr_dels_[attr.first](); } attrs_.clear(); @@ -40,12 +41,12 @@ class Graph { } template - AttrType& Get(const std::string& attr_name) const { - return *boost::any_cast(attrs_.at(attr_name)); + AttrType &Get(const std::string &attr_name) const { + return *boost::any_cast(attrs_.at(attr_name)); } template - void Set(const std::string& attr_name, AttrType* attr) { + void Set(const std::string &attr_name, AttrType *attr) { PADDLE_ENFORCE(attrs_.count(attr_name) == 0); attrs_[attr_name] = attr; attr_dels_[attr_name] = [attr, attr_name]() { @@ -54,29 +55,70 @@ class Graph { }; } - ir::Node* CreateVarNode(VarDesc* var_desc) { - nodes.emplace_back(new ir::Node(var_desc)); - return nodes.back().get(); + const std::unordered_set &Nodes() const { return node_set_; } + + // Create a normal variable with non-null VarDesc. + ir::Node *CreateVarNode(VarDesc *var_desc) { + return AddNode(new ir::Node(var_desc)); + } + + // Create a normal runnable operator with OpDesc. + ir::Node *CreateOpNode(OpDesc *op_desc) { + return AddNode(new ir::Node(op_desc)); } - ir::Node* CreateOpNode(OpDesc* op_desc) { - nodes.emplace_back(new ir::Node(op_desc)); - return nodes.back().get(); + // Create a control dependency var that connects 2 operations. The + // var doesn't hold any data. Other than that, it's no different from + // other var, considering dependency analysis. + ir::Node *CreateControlDepVar() { + // TODO(panyx0718): control var name should be really unique. + const std::string name = string::Sprintf( + "%s@%llu", ir::Node::kControlDepVarName, node_set_.size()); + return AddNode(new ir::Node(name, ir::Node::Type::kVariable)); } - ir::Node* CreateEmptyNode(const std::string& name, ir::Node::Type type) { - nodes.emplace_back(new ir::Node(name, type)); - return nodes.back().get(); + // A more free style way of creating a graph node. Mostly use for test + // or "copy" from another node. Avoid using it if possible. + ir::Node *CreateEmptyNode(const std::string &name, ir::Node::Type type) { + return AddNode(new ir::Node(name, type)); } - std::vector> nodes; + // Clear all node information of the graph and return the ownership of the + // nodes. + std::vector> ReleaseNodes() { + std::vector> ret; + for (auto &n : nodes_) { + ret.emplace_back(n.second.release()); + } + nodes_.clear(); + node_set_.clear(); + return ret; + } private: + // This method takes ownership of `node`. + ir::Node *AddNode(ir::Node *node) { + PADDLE_ENFORCE(node_set_.find(node) == node_set_.end()); + nodes_[node].reset(node); + node_set_.insert(node); + return node; + } + + void RemoveNode(ir::Node *node) { + PADDLE_ENFORCE(node_set_.find(node) != node_set_.end()); + node_set_.erase(node); + nodes_.erase(node); + } + // NOTE: program_ shouldn't be exposed to user. - const ProgramDesc& program_; + const ProgramDesc &program_; std::map attrs_; std::map> attr_dels_; + std::map> nodes_; + std::unordered_set node_set_; }; +bool IsControlDepVar(const ir::Node &var); +} // namespace ir } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/ir/graph_helper.cc b/paddle/fluid/framework/ir/graph_helper.cc new file mode 100644 index 0000000000000000000000000000000000000000..b1c19e6535150130822e9f48685241e62de5b064 --- /dev/null +++ b/paddle/fluid/framework/ir/graph_helper.cc @@ -0,0 +1,118 @@ +/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include +#include + +#include "paddle/fluid/framework/ir/graph_helper.h" + +namespace paddle { +namespace framework { +namespace ir { +namespace { +void SortHelper( + const std::map> &adj_list, + ir::Node *node, std::unordered_set *visited, + std::vector *ret) { + visited->insert(node); + + for (auto adj : adj_list.at(node)) { + if (visited->find(adj) == visited->end()) { + SortHelper(adj_list, adj, visited, ret); + } + } + + VLOG(3) << "topology sort insert: " << node->Name() + << reinterpret_cast(node) << " input " << node->inputs.size(); + ret->push_back(node); +} + +bool HasCircleHelper( + ir::Node *node, + const std::map> &adj_list, + std::unordered_set *visited, + std::unordered_set *in_trace) { + if (visited->find(node) == visited->end()) { + visited->insert(node); + in_trace->insert(node); + + for (ir::Node *in : adj_list.at(node)) { + if (visited->find(in) == visited->end() && + HasCircleHelper(in, adj_list, visited, in_trace)) { + return true; + } else if (in_trace->find(in) != in_trace->end()) { + return true; + } + } + } + in_trace->erase(node); + return false; +} + +bool HasCircleInternal( + const std::map> &adj_list) { + std::unordered_set visited; + std::unordered_set in_trace; + for (auto &adj : adj_list) { + if (HasCircleHelper(adj.first, adj_list, &visited, &in_trace)) { + return true; + } + } + return false; +} +} // namespace + +bool HasCircle(const Graph &graph) { + return HasCircleInternal(BuildOperationAdjList(graph)); +} + +std::vector TopologySortOperations(const Graph &graph) { + std::map> adj_list = + BuildOperationAdjList(graph); + PADDLE_ENFORCE(!HasCircleInternal(adj_list)); + std::unordered_set visited; + std::vector ret; + for (auto adj : adj_list) { + if (visited.find(adj.first) == visited.end()) { + SortHelper(adj_list, adj.first, &visited, &ret); + } + } + return ret; +} + +std::map> BuildOperationAdjList( + const Graph &graph) { + std::map> adj_list; + + for (auto &n : graph.Nodes()) { + if (n->NodeType() != ir::Node::Type::kOperation) continue; + if (adj_list.find(n) == adj_list.end()) { + adj_list[n] = std::unordered_set(); + } + for (auto &var : n->inputs) { + for (auto &adj_n : var->inputs) { + PADDLE_ENFORCE(adj_n->NodeType() == ir::Node::Type::kOperation); + adj_list[n].insert(adj_n); + VLOG(3) << "adj " << adj_n->Name() << reinterpret_cast(adj_n) + << " -> " << n->Name() << reinterpret_cast(n) + << " via " << var->Name() << reinterpret_cast(var); + } + } + } + return adj_list; +} + +} // namespace ir +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/ir/graph_helper.h b/paddle/fluid/framework/ir/graph_helper.h new file mode 100644 index 0000000000000000000000000000000000000000..cd6c53a07f8f56781989739d995226bd02b3d3d0 --- /dev/null +++ b/paddle/fluid/framework/ir/graph_helper.h @@ -0,0 +1,40 @@ +/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once + +#include +#include +#include + +#include "paddle/fluid/framework/ir/graph.h" +#include "paddle/fluid/framework/ir/node.h" + +namespace paddle { +namespace framework { +namespace ir { +// Test if the graph contains circle. +bool HasCircle(const Graph &graph); + +// Topology Sort the operations in the graph from inputs to outputs. +// `graph` cannot contain circle. +std::vector TopologySortOperations(const Graph &graph); + +// Build an adjacency list of operations for the `graph`. +std::map> BuildOperationAdjList( + const Graph &graph); + +} // namespace ir +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/ir/graph_helper_test.cc b/paddle/fluid/framework/ir/graph_helper_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..b517442bb73f43bc1cb1d639b6c6cf004b28d4cf --- /dev/null +++ b/paddle/fluid/framework/ir/graph_helper_test.cc @@ -0,0 +1,125 @@ +/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/framework/ir/graph.h" +#include +#include "gtest/gtest.h" +#include "paddle/fluid/framework/ir/graph_helper.h" +#include "paddle/fluid/framework/program_desc.h" + +namespace paddle { +namespace framework { +namespace ir { + +void BuildCircleGraph(Graph* g) { + ir::Node* o1 = g->CreateEmptyNode("op1", Node::Type::kOperation); + ir::Node* v1 = g->CreateEmptyNode("var1", Node::Type::kVariable); + + o1->outputs.push_back(v1); + o1->inputs.push_back(v1); + v1->inputs.push_back(o1); + v1->outputs.push_back(o1); +} + +void BuildCircleGraph2(Graph* g) { + ir::Node* o1 = g->CreateEmptyNode("op1", Node::Type::kOperation); + ir::Node* o2 = g->CreateEmptyNode("op2", Node::Type::kOperation); + ir::Node* v1 = g->CreateEmptyNode("var1", Node::Type::kVariable); + ir::Node* v2 = g->CreateEmptyNode("var2", Node::Type::kVariable); + + o1->outputs.push_back(v1); + o2->inputs.push_back(v1); + v1->inputs.push_back(o1); + v1->outputs.push_back(o2); + + o2->outputs.push_back(v2); + o1->inputs.push_back(v2); + v2->inputs.push_back(o2); + v2->outputs.push_back(o1); +} + +void BuildNoCircleGraph(Graph* g) { + ir::Node* o1 = g->CreateEmptyNode("op1", Node::Type::kOperation); + ir::Node* o2 = g->CreateEmptyNode("op2", Node::Type::kOperation); + ir::Node* o3 = g->CreateEmptyNode("op3", Node::Type::kOperation); + ir::Node* o4 = g->CreateEmptyNode("op4", Node::Type::kOperation); + ir::Node* o5 = g->CreateEmptyNode("op5", Node::Type::kOperation); + ir::Node* v1 = g->CreateEmptyNode("var1", Node::Type::kVariable); + ir::Node* v2 = g->CreateEmptyNode("var2", Node::Type::kVariable); + ir::Node* v3 = g->CreateEmptyNode("var3", Node::Type::kVariable); + ir::Node* v4 = g->CreateEmptyNode("var4", Node::Type::kVariable); + + // o1->v1->o2 + o1->outputs.push_back(v1); + o2->inputs.push_back(v1); + v1->inputs.push_back(o1); + v1->outputs.push_back(o2); + // o2->v2->o3 + // o2->v2->o4 + o2->outputs.push_back(v2); + o3->inputs.push_back(v2); + o4->inputs.push_back(v2); + v2->inputs.push_back(o2); + v2->outputs.push_back(o3); + v2->outputs.push_back(o4); + // o2->v3->o5 + o2->outputs.push_back(v3); + o5->inputs.push_back(v3); + v3->inputs.push_back(o2); + v3->outputs.push_back(o5); + // o3-v4->o5 + o3->outputs.push_back(v4); + o5->inputs.push_back(v4); + v4->inputs.push_back(o3); + v4->outputs.push_back(o5); +} + +TEST(GraphHelperTest, Basic) { + ProgramDesc prog; + + Graph g(prog); + BuildCircleGraph(&g); + ASSERT_TRUE(HasCircle(g)); + + Graph g2(prog); + BuildCircleGraph2(&g2); + ASSERT_TRUE(HasCircle(g2)); + + auto adj_list = BuildOperationAdjList(g2); + for (auto& adj : adj_list) { + auto& adj_set = adj.second; + if (adj.first->Name() == "op1") { + ASSERT_EQ((*adj_set.begin())->Name(), "op2"); + } else if (adj.first->Name() == "op2") { + ASSERT_EQ((*adj_set.begin())->Name(), "op1"); + } else { + ASSERT_TRUE(false); + } + } + + Graph g3(prog); + BuildNoCircleGraph(&g3); + ASSERT_FALSE(HasCircle(g3)); + auto sorted = TopologySortOperations(g3); + std::map node_map; + for (size_t i = 0; i < sorted.size(); ++i) { + node_map[sorted[i]->Name()] = i; + } + ASSERT_EQ(node_map.at("op1"), 0); + ASSERT_EQ(node_map.at("op2"), 1); + ASSERT_TRUE(node_map.at("op3") < node_map.at("op5")); +} +} // namespace ir +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/ir/graph_test.cc b/paddle/fluid/framework/ir/graph_test.cc index 4e23bf124f8822e25be0f6b1c7c8c5de4e4f600a..73ef55756c330bdbc3be89c436967b2a88625a43 100644 --- a/paddle/fluid/framework/ir/graph_test.cc +++ b/paddle/fluid/framework/ir/graph_test.cc @@ -76,6 +76,7 @@ TEST(GraphTest, Basic) { op->SetType("sum"); op->SetInput("X", {"test_a", "test_b", "test_c"}); op->SetOutput("Out", {"test_out"}); + op->SetAttr("op_role", 1); prog.MutableBlock(0)->Var("test_a")->SetType(proto::VarType::SELECTED_ROWS); prog.MutableBlock(0)->Var("test_b")->SetType(proto::VarType::SELECTED_ROWS); @@ -92,21 +93,22 @@ TEST(GraphTest, Basic) { ASSERT_EQ(proto::VarType::LOD_TENSOR, prog.MutableBlock(0)->Var("test_out")->GetType()); - std::unique_ptr g(new Graph(prog)); - ASSERT_EQ(g->nodes[0]->Name(), "sum"); - ASSERT_EQ(g->nodes[0]->inputs[0]->Name(), "test_a"); - ASSERT_EQ(g->nodes[0]->inputs[1]->Name(), "test_b"); - ASSERT_EQ(g->nodes[0]->inputs[2]->Name(), "test_c"); - ASSERT_EQ(g->nodes[0]->outputs[0]->Name(), "test_out"); - ASSERT_EQ(g->nodes[1]->Name(), "test_a"); - ASSERT_EQ(g->nodes[1]->outputs[0]->Name(), "sum"); - ASSERT_EQ(g->nodes[2]->Name(), "test_b"); - ASSERT_EQ(g->nodes[2]->outputs[0]->Name(), "sum"); - ASSERT_EQ(g->nodes[3]->Name(), "test_c"); - ASSERT_EQ(g->nodes[3]->outputs[0]->Name(), "sum"); - ASSERT_EQ(g->nodes[4]->Name(), "test_out"); - ASSERT_EQ(g->nodes[4]->inputs[0]->Name(), "sum"); - ASSERT_EQ(g->nodes.size(), 5); + std::unique_ptr g(new ir::Graph(prog)); + std::vector nodes(g->Nodes().begin(), g->Nodes().end()); + for (ir::Node *n : nodes) { + if (n->Name() == "sum") { + ASSERT_EQ(n->inputs.size(), 3); + ASSERT_EQ(n->outputs.size(), 1); + } else if (n->Name() == "test_a" || n->Name() == "test_b" || + n->Name() == "test_c") { + ASSERT_EQ(n->inputs.size(), 0); + ASSERT_EQ(n->outputs.size(), 1); + } else if (n->Name() == "test_out") { + ASSERT_EQ(n->inputs.size(), 1); + ASSERT_EQ(n->outputs.size(), 0); + } + } + ASSERT_EQ(nodes.size(), 5); } } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/ir/node.cc b/paddle/fluid/framework/ir/node.cc index 86376e7e8bc8bee2ddbc18f7f24bcdd849a06cbf..aca77da8d674f29b89c023717cdcd061232d023a 100644 --- a/paddle/fluid/framework/ir/node.cc +++ b/paddle/fluid/framework/ir/node.cc @@ -15,5 +15,9 @@ limitations under the License. */ #include "paddle/fluid/framework/ir/node.h" namespace paddle { -namespace framework {} // namespace framework +namespace framework { +namespace ir { +const char Node::kControlDepVarName[] = "__control_var"; +} // namespace ir +} // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/ir/node.h b/paddle/fluid/framework/ir/node.h index b98c29b81ddc2f57553b8fe76fcfeb0936ddd837..b3138fccee86fb274abe72007961fc1c982b1e96 100644 --- a/paddle/fluid/framework/ir/node.h +++ b/paddle/fluid/framework/ir/node.h @@ -27,6 +27,8 @@ namespace ir { class Node { public: enum class Type { kOperation, kVariable }; + static const char kControlDepVarName[]; + explicit Node(const std::string& name, Type type) : name_(name), var_desc_(nullptr), op_desc_(nullptr), type_(type) {} @@ -50,6 +52,7 @@ class Node { PADDLE_ENFORCE(type_ == Type::kVariable); return var_desc_; } + OpDesc* Op() { PADDLE_ENFORCE(type_ == Type::kOperation); return op_desc_; diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 1e5bba62b53025dacdbf2d74b35f266cf4e422c2..02c836bea194553bb9c4bc5677cc408dd302e9ce 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -132,7 +132,7 @@ ParallelExecutor::ParallelExecutor( #endif } builder_ = builder_factory.Create(); - std::unique_ptr graph(new Graph(main_program)); + std::unique_ptr graph(new ir::Graph(main_program)); graph = builder_->Apply(std::move(graph)); member_->executor_.reset(new details::ThreadedSSAGraphExecutor( exec_strategy, member_->local_scopes_, places, std::move(graph))); diff --git a/paddle/fluid/operators/CMakeLists.txt b/paddle/fluid/operators/CMakeLists.txt index 4e2002ad24415437ae4f85eba0e90a6c689e2996..9b56ad4c55e35d497aa7abe4e1da3867a2084b88 100644 --- a/paddle/fluid/operators/CMakeLists.txt +++ b/paddle/fluid/operators/CMakeLists.txt @@ -192,9 +192,9 @@ if(WITH_DISTRIBUTE) set(DISTRIBUTE_DEPS "") if(WITH_GRPC) - set(DISTRIBUTE_DEPS sendrecvop_grpc grpc++_unsecure grpc_unsecure gpr cares zlib protobuf) + set(DISTRIBUTE_DEPS sendrecvop_grpc grpc++_unsecure grpc_unsecure gpr cares zlib protobuf node) else() - set(DISTRIBUTE_DEPS sendrecvop_brpc brpc leveldb snappystream snappy protobuf ssl crypto zlib) + set(DISTRIBUTE_DEPS sendrecvop_brpc brpc leveldb snappystream snappy protobuf ssl crypto zlib node) if(WITH_BRPC_RDMA) find_library(IBVERBS_LIBRARY NAMES ibverbs) ADD_LIBRARY(ibverbs SHARED IMPORTED GLOBAL) diff --git a/paddle/fluid/operators/send_recv_util.h b/paddle/fluid/operators/send_recv_util.h index deab005149027caffa962783df944fad7110382f..dc26c53c64f06ce21856fb5af8f2a5eb3fc75bb7 100644 --- a/paddle/fluid/operators/send_recv_util.h +++ b/paddle/fluid/operators/send_recv_util.h @@ -14,6 +14,7 @@ limitations under the License. */ #pragma once #include +#include "paddle/fluid/framework/ir/node.h" namespace paddle { namespace operators { @@ -22,7 +23,10 @@ inline bool NeedSend(const framework::Scope& scope, const std::string& varname) { // dummy variable is only used in parallel executor to represent // some dependency relationship, we don't need to send/recv it. - if (varname == "dummy") return false; + // TODO(paddle-dev): Why would parallel executor logic leaked into here? + if (varname.find(framework::ir::Node::kControlDepVarName) != + std::string::npos) + return false; auto* var = scope.FindVar(varname); PADDLE_ENFORCE_NOT_NULL(var, "Can not find variable '%s' in the send side.", varname);