diff --git a/doc/fluid/design/ir/draft.md b/doc/fluid/design/ir/draft.md index a141dcbca584c6064c8da863410692a8be911d12..a33b5a9c9312c93247a1e1f3431061a5aad6c884 100644 --- a/doc/fluid/design/ir/draft.md +++ b/doc/fluid/design/ir/draft.md @@ -1,16 +1,16 @@ ## Motivation -There is a ```gap``` between the ```Program``` defined by -user and the ```Executable``` that can be scheduled +There is a `gap` between the `Program` defined by +user and the `Executable` that can be scheduled efficiently on heterogeneous hardware, either locally or distributedly. -Usually, the ```gap``` is bridged by +Usually, the `gap` is bridged by * A serious transformations with defined order. * These transformations usually involve -```insert, delete, clustering, split, dependency analysis```. +`insert, delete, clustering, split, dependency analysis`. * Has a simple way to verify and debug each transformation. @@ -38,44 +38,44 @@ design below. #### Node -```Node``` represents an operation that performs some computation or +`Node` represents an operation that performs some computation or a variable that is input or output of operation. -```Node```s are connected to other ```Node```s via inputs and outputs. +`Node`s are connected to other `Node`s via inputs and outputs. Other properties (maybe device placement information) can be added -to ```Node``` in the future if it's a -common requirement of many other ```Pass```es. Otherwise, it should live -in a ```Node``` wrapper class that is private to some ```Pass``` or be -a local member of a ```Pass```. +to `Node` in the future if it's a +common requirement of many other `Pass`es. Otherwise, it should live +in a `Node` wrapper class that is private to some `Pass` or be +a local member of a `Pass`. #### Graph -```Graph``` contains a list of ```Node```s, which are connected to +`Graph` contains a list of `Node`s, which are connected to each other via inputs and outputs. TODO: Better definitions for the graph. -```Graph``` can also contain ```Attribute```s. ```Attribute```s -can be ``any`` thing. For example, it can be a list of "wraper" -nodes. The ```wrapper``` nodes compose ```Node```s and provide -helper method for execution or transformation. ```Attribute``` +`Graph` can also contain `Attribute`s. `Attribute`s +can be `any` thing. For example, it can be a list of "wraper" +nodes. The `wrapper` nodes compose `Node`s and provide +helper method for execution or transformation. `Attribute` can also contain other things that describe some properties of -the ```Graph``` or ```Graph``` nodes. ```Attribute``` can be passed -across ```Pass```. However, it should be used with care. +the `Graph` or `Graph` nodes. `Attribute` can be passed +across `Pass`. However, it should be used with care. #### Pass -```Pass``` represents a transformation of ```Graph```. Its input -is a ```Graph``` and its output is also a ```Graph```. For example, -a ```Pass``` can simply print out the ```Graph```. A ```Pass``` -can also fuse some ```Graph```'s ```Node```s. +`Pass` represents a transformation of `Graph`. Its input +is a `Graph` and its output is also a `Graph`. For example, +a `Pass` can simply print out the `Graph`. A `Pass` +can also fuse some `Graph`'s `Node`s. #### Optimize -```Optimize``` contains a series of ```Pass``` with defined order. -```Optimize``` transforms a ```Graph``` that only contains raw -modeling logic to a ```Graph``` that can be run efficiently while +`Optimize` contains a series of `Pass` with defined order. +`Optimize` transforms a `Graph` that only contains raw +modeling logic to a `Graph` that can be run efficiently while maintaining the original modeling logic. diff --git a/paddle/fluid/API.spec b/paddle/fluid/API.spec index 08147fdccd51f6899765cae3b5109e68ed27e936..6efb03dabe89b28f3ff1a55c4a940dfe74e8001d 100644 --- a/paddle/fluid/API.spec +++ b/paddle/fluid/API.spec @@ -35,8 +35,7 @@ paddle.fluid.program_guard ArgSpec(args=[], varargs='args', keywords='kwds', def paddle.fluid.get_var ArgSpec(args=['name', 'program'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.Executor.__init__ ArgSpec(args=['self', 'place'], varargs=None, keywords=None, defaults=None) paddle.fluid.Executor.as_lodtensor ArgSpec(args=['self', 'data'], varargs=None, keywords=None, defaults=None) -paddle.fluid.Executor.begin_pass ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) -paddle.fluid.Executor.end_pass ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) +paddle.fluid.Executor.close ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) paddle.fluid.Executor.run ArgSpec(args=['self', 'program', 'feed', 'fetch_list', 'feed_var_name', 'fetch_var_name', 'scope', 'return_numpy', 'use_program_cache'], varargs=None, keywords=None, defaults=(None, None, None, 'feed', 'fetch', None, True, False)) paddle.fluid.global_scope ArgSpec(args=[], varargs=None, keywords=None, defaults=None) paddle.fluid.scope_guard ArgSpec(args=[], varargs='args', keywords='kwds', defaults=None) @@ -200,31 +199,23 @@ paddle.fluid.layers.argsort ArgSpec(args=['input', 'axis', 'name'], varargs=None paddle.fluid.layers.ones ArgSpec(args=['shape', 'dtype', 'force_cpu'], varargs=None, keywords=None, defaults=(False,)) paddle.fluid.layers.zeros ArgSpec(args=['shape', 'dtype', 'force_cpu'], varargs=None, keywords=None, defaults=(False,)) paddle.fluid.layers.reverse ArgSpec(args=['x', 'axis'], varargs=None, keywords=None, defaults=None) -paddle.fluid.layers.split_lod_tensor ArgSpec(args=['input', 'mask', 'level'], varargs=None, keywords=None, defaults=(0,)) -paddle.fluid.layers.merge_lod_tensor ArgSpec(args=['in_true', 'in_false', 'x', 'mask', 'level'], varargs=None, keywords=None, defaults=(0,)) paddle.fluid.layers.While.__init__ ArgSpec(args=['self', 'cond', 'name'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.layers.While.block ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) paddle.fluid.layers.While.complete ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) paddle.fluid.layers.Switch.__init__ ArgSpec(args=['self', 'name'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.layers.Switch.case ArgSpec(args=['self', 'condition'], varargs=None, keywords=None, defaults=None) paddle.fluid.layers.Switch.default ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) -paddle.fluid.layers.lod_rank_table ArgSpec(args=['x', 'level'], varargs=None, keywords=None, defaults=(0,)) -paddle.fluid.layers.max_sequence_len ArgSpec(args=['rank_table'], varargs=None, keywords=None, defaults=None) -paddle.fluid.layers.lod_tensor_to_array ArgSpec(args=['x', 'table'], varargs=None, keywords=None, defaults=None) -paddle.fluid.layers.array_to_lod_tensor ArgSpec(args=['x', 'table'], varargs=None, keywords=None, defaults=None) paddle.fluid.layers.increment ArgSpec(args=['x', 'value', 'in_place'], varargs=None, keywords=None, defaults=(1.0, True)) paddle.fluid.layers.array_write ArgSpec(args=['x', 'i', 'array'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.layers.create_array ArgSpec(args=['dtype'], varargs=None, keywords=None, defaults=None) paddle.fluid.layers.less_than ArgSpec(args=['x', 'y', 'force_cpu', 'cond'], varargs=None, keywords='ignored', defaults=(None, None)) paddle.fluid.layers.equal ArgSpec(args=['x', 'y', 'cond'], varargs=None, keywords='ignored', defaults=(None,)) paddle.fluid.layers.array_read ArgSpec(args=['array', 'i'], varargs=None, keywords=None, defaults=None) -paddle.fluid.layers.shrink_memory ArgSpec(args=['x', 'i', 'table'], varargs=None, keywords=None, defaults=None) paddle.fluid.layers.array_length ArgSpec(args=['array'], varargs=None, keywords=None, defaults=None) paddle.fluid.layers.IfElse.__init__ ArgSpec(args=['self', 'cond', 'name'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.layers.IfElse.false_block ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) paddle.fluid.layers.IfElse.input ArgSpec(args=['self', 'x'], varargs=None, keywords=None, defaults=None) paddle.fluid.layers.IfElse.output ArgSpec(args=['self'], varargs='outs', keywords=None, defaults=None) -paddle.fluid.layers.IfElse.parent_block ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) paddle.fluid.layers.IfElse.true_block ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) paddle.fluid.layers.DynamicRNN.__init__ ArgSpec(args=['self', 'name'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.layers.DynamicRNN.block ArgSpec(args=[], varargs='args', keywords='kwds', defaults=None) @@ -233,9 +224,6 @@ paddle.fluid.layers.DynamicRNN.output ArgSpec(args=['self'], varargs='outputs', paddle.fluid.layers.DynamicRNN.static_input ArgSpec(args=['self', 'x'], varargs=None, keywords=None, defaults=None) paddle.fluid.layers.DynamicRNN.step_input ArgSpec(args=['self', 'x'], varargs=None, keywords=None, defaults=None) paddle.fluid.layers.DynamicRNN.update_memory ArgSpec(args=['self', 'ex_mem', 'new_mem'], varargs=None, keywords=None, defaults=None) -paddle.fluid.layers.ConditionalBlock.__init__ ArgSpec(args=['self', 'inputs', 'is_scalar_condition', 'name'], varargs=None, keywords=None, defaults=(False, None)) -paddle.fluid.layers.ConditionalBlock.block ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) -paddle.fluid.layers.ConditionalBlock.complete ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) paddle.fluid.layers.StaticRNN.__init__ ArgSpec(args=['self', 'name'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.layers.StaticRNN.complete_op ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) paddle.fluid.layers.StaticRNN.memory ArgSpec(args=['self', 'init', 'shape', 'batch_ref', 'init_value', 'init_batch_dim_idx', 'ref_batch_dim_idx'], varargs=None, keywords=None, defaults=(None, None, None, 0.0, 0, 1)) diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index de06c860f550641a58a32d49e85feb7278fed1dd..93ec047c8012e41cc9dfb651e8de2b4749f93299 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -22,7 +22,12 @@ endif() cc_test(eigen_test SRCS eigen_test.cc DEPS tensor) -nv_test(mixed_vector_test SRCS mixed_vector_test.cu DEPS place memory device_context tensor) +if(WITH_GPU) + nv_test(mixed_vector_test SRCS mixed_vector_test.cc mixed_vector_test.cu DEPS place memory device_context tensor) +else() + cc_test(mixed_vector_test SRCS mixed_vector_test.cc DEPS place memory device_context tensor) +endif() + cc_library(lod_tensor SRCS lod_tensor.cc DEPS ddim place tensor framework_proto recordio) cc_test(lod_tensor_test SRCS lod_tensor_test.cc DEPS lod_tensor memory) nv_test(lod_tensor_gpu_test SRCS lod_tensor_test.cu DEPS lod_tensor) diff --git a/paddle/fluid/framework/details/CMakeLists.txt b/paddle/fluid/framework/details/CMakeLists.txt index df55b3d05402f1aeecfd8d4218a637a81d58ed87..9df7df1f42886d40210b16aa2ae5823e3310bfe7 100644 --- a/paddle/fluid/framework/details/CMakeLists.txt +++ b/paddle/fluid/framework/details/CMakeLists.txt @@ -1,11 +1,11 @@ -cc_library(var_handle SRCS var_handle.cc DEPS place framework_proto) +cc_library(var_handle SRCS var_handle.cc DEPS place framework_proto node) cc_library(op_handle_base SRCS op_handle_base.cc DEPS var_handle device_context lod_tensor) cc_library(scale_loss_grad_op_handle SRCS scale_loss_grad_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory) cc_library(fetch_op_handle SRCS fetch_op_handle.cc DEPS op_handle_base scope lod_tensor ddim memory) cc_library(computation_op_handle SRCS computation_op_handle.cc DEPS framework_proto scope place operator op_registry) cc_library(rpc_op_handle SRCS rpc_op_handle.cc DEPS framework_proto scope place operator op_registry) -cc_library(ssa_graph_builder SRCS ssa_graph_builder.cc DEPS graph) +cc_library(ssa_graph_builder SRCS ssa_graph_builder.cc DEPS graph graph_helper) cc_library(ssa_graph_printer SRCS ssa_graph_printer.cc DEPS ssa_graph_builder) cc_library(ssa_graph_checker SRCS ssa_graph_checker.cc DEPS ssa_graph_builder) diff --git a/paddle/fluid/framework/details/multi_devices_graph_builder.cc b/paddle/fluid/framework/details/multi_devices_graph_builder.cc index f1f8674caf663ce38df5a2eecbcf690b5ca87dc4..22f0cb20d01cc5b40325ec37a8c7cd44105bc6c6 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_builder.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_builder.cc @@ -25,6 +25,7 @@ #include "paddle/fluid/framework/details/reduce_op_handle.h" #include "paddle/fluid/framework/details/rpc_op_handle.h" #include "paddle/fluid/framework/details/scale_loss_grad_op_handle.h" +#include "paddle/fluid/framework/ir/graph_helper.h" #include "paddle/fluid/framework/ir/node.h" #include "paddle/fluid/framework/op_info.h" #include "paddle/fluid/framework/scope.h" @@ -67,7 +68,8 @@ MultiDevSSAGraphBuilder::MultiDevSSAGraphBuilder( } } -void MultiDevSSAGraphBuilder::CreateOpHandleIOs(Graph *result, ir::Node *node, +void MultiDevSSAGraphBuilder::CreateOpHandleIOs(ir::Graph *result, + ir::Node *node, size_t place_id) const { auto p = places_[place_id]; auto *op_handle = result->Get("ops").back().get(); @@ -92,12 +94,11 @@ void MultiDevSSAGraphBuilder::CreateOpHandleIOs(Graph *result, ir::Node *node, } std::vector MultiDevSSAGraphBuilder::FindDistTrainSendVars( - const std::vector> &nodes) const { + const std::vector &nodes) const { std::vector send_vars; // since parameters are all in block 0, // it's enough to only scan send ops in block 0 for (auto &node : nodes) { - if (node->NodeType() != ir::Node::Type::kOperation) continue; OpDesc *op = node->Op(); // TODO(Yancey1989): use a graceful method to find send op, // instead of the the hard code string @@ -112,10 +113,9 @@ std::vector MultiDevSSAGraphBuilder::FindDistTrainSendVars( } std::vector MultiDevSSAGraphBuilder::FindDistTrainRecvVars( - const std::vector> &nodes) const { + const std::vector &nodes) const { std::vector recv_vars; for (auto &node : nodes) { - if (node->NodeType() != ir::Node::Type::kOperation) continue; OpDesc *op = node->Op(); // TODO(Yancey1989): use a graceful method to find recv op, // instead of the hard code string @@ -170,6 +170,7 @@ size_t MultiDevSSAGraphBuilder::GetAppropriateDeviceID( const std::vector &var_names) const { int64_t numel_sum = 0; for (auto var_name : var_names) { + if (all_vars_.find(var_name) == all_vars_.end()) continue; auto var_desc = all_vars_.at(var_name); PADDLE_ENFORCE_NOT_NULL(var_desc); auto dim = framework::make_ddim(var_desc->GetShape()); @@ -186,19 +187,70 @@ size_t MultiDevSSAGraphBuilder::GetAppropriateDeviceID( return dev_id; } -std::unique_ptr MultiDevSSAGraphBuilder::Apply( - std::unique_ptr graph) const { - // Rebuild the graph structure. - auto nodes = std::move(graph->nodes); - graph->nodes.clear(); +// Topology sort the graph nodes from inputs to outputs. +// Since SSAGraphBuilder depends on forward/backward nodes to assign devices +// to parameter/gradients before optimizer ops, topo sort is insufficient. ( +// some optimizer ops might not depend on any nodes), we manually move all +// optimizer nodes after last backward nodes. +// However, the assumption by SSAGraphBuilder should be relaxed in the future. +std::vector SortOpsAndDelayOptimizeOp(const ir::Graph &graph) { + std::vector ret = ir::TopologySortOperations(graph); + size_t last_backward = 0; + for (size_t i = 0; i < ret.size(); ++i) { + if (boost::get( + ret[i]->Op()->GetAttr(OpProtoAndCheckerMaker::OpRoleAttrName())) == + static_cast(OpRole::kBackward)) { + last_backward = i; + } + } + + std::vector optimize_ops; + std::vector sorted_ret; + for (size_t i = 0; i < ret.size(); ++i) { + if (i < last_backward) { + if (boost::get(ret[i]->Op()->GetAttr( + OpProtoAndCheckerMaker::OpRoleAttrName())) == + static_cast(OpRole::kOptimize)) { + optimize_ops.push_back(ret[i]); + } else { + sorted_ret.push_back(ret[i]); + } + } else if (i == last_backward) { + sorted_ret.push_back(ret[i]); + // Verify that no operations before optimize ops depends on optimize ops. + std::unordered_set optimize_set(optimize_ops.begin(), + optimize_ops.end()); + for (ir::Node *n : sorted_ret) { + for (ir::Node *in : n->inputs) { + for (ir::Node *pre_n : in->inputs) { + PADDLE_ENFORCE(optimize_set.find(pre_n) == optimize_set.end(), + "optimize operations cannot be depended by forward " + "or backward node %s -> %s", + pre_n->Name(), n->Name()); + } + } + } + sorted_ret.insert(sorted_ret.end(), optimize_ops.begin(), + optimize_ops.end()); + } else { + sorted_ret.push_back(ret[i]); + } + } + return sorted_ret; +} + +std::unique_ptr MultiDevSSAGraphBuilder::Apply( + std::unique_ptr graph) const { + // Give the topology sort order and rebuild the graph structure. + std::vector sorted_ops = SortOpsAndDelayOptimizeOp(*graph); + auto nodes = graph->ReleaseNodes(); + ir::Graph &result = *graph; for (auto &node : nodes) { - if (node->NodeType() == ir::Node::Type::kVariable) { + if (node->NodeType() == ir::Node::Type::kVariable && node->Var()) { all_vars_.emplace(node->Name(), node->Var()); } } - - Graph &result = *graph; std::unordered_set og_has_been_broadcast; // We cannot invoke resize. It is a bug of GCC 4.8 @@ -208,8 +260,8 @@ std::unique_ptr MultiDevSSAGraphBuilder::Apply( // find send/recv vars so that we can place the distributed training // realted op in the place 0 - auto send_vars = FindDistTrainSendVars(nodes); - auto recv_vars = FindDistTrainRecvVars(nodes); + auto send_vars = FindDistTrainSendVars(sorted_ops); + auto recv_vars = FindDistTrainRecvVars(sorted_ops); std::vector> bcast_var_name_set; bcast_var_name_set.resize(places_.size()); @@ -217,22 +269,18 @@ std::unique_ptr MultiDevSSAGraphBuilder::Apply( size_t cur_device_id = 0; bool is_forwarding = true; - // NOTE: Currently, passes before SSAGraphBuilder cannot reorder - // forward, backward nodes. E.g. you can't append an forward node - // at the end of the node list. - // TODO(panyx0718): FIXME: Needs to sort by forward->backward order. - for (auto &node : nodes) { - if (node->NodeType() != ir::Node::Type::kOperation) continue; + for (ir::Node *node : sorted_ops) { if (boost::get( node->Op()->GetAttr(OpProtoAndCheckerMaker::OpRoleAttrName())) == static_cast(OpRole::kRPC)) { - CreateRPCOp(&result, node.get()); - } else if (IsDistTrainOp(node.get(), send_vars, recv_vars)) { - CreateDistTrainOp(&result, node.get()); - } else if (IsScaleLossOp(node.get())) { + CreateRPCOp(&result, node); + } else if (IsDistTrainOp(node, send_vars, recv_vars)) { + CreateDistTrainOp(&result, node); + } else if (IsScaleLossOp(node)) { // user can customize loss@grad if not use_default_grad_scale_ if (strategy_.gradient_scale_ != BuildStrategy::GradientScaleStrategy::kCustomized) { + // TODO(paddle-dev): Why is there no input for this op_handle? CreateScaleLossGradOp(&result); } // This assumes the backward generating code will ensure IsScaleLossOp @@ -241,24 +289,23 @@ std::unique_ptr MultiDevSSAGraphBuilder::Apply( // the block. is_forwarding = false; } else { - int op_dev_id = GetOpDeviceID(node.get()); + int op_dev_id = GetOpDeviceID(node); if (op_dev_id != -1) { // This op only runs on one specific device. - CreateComputationalOp(&result, node.get(), op_dev_id); + CreateComputationalOp(&result, node, op_dev_id); for (ir::Node *n : node->outputs) { var_name_on_devices_.emplace(n->Name(), op_dev_id); } } else { // This op runs on all devices, and its output may have parameter's // gradients. + // TODO(paddle-dev): Why is so special about "read" op? if (node->Op()->Type() == "read" && strategy_.enable_data_balance_) { node->Op()->SetAttr("throw_eof_exp", false); - CreateComputationalOps(&result, node.get(), places_.size()); - // TODO(paddle-dev): builder shouldn't depend on the out logic of - // a specific op. + CreateComputationalOps(&result, node, places_.size()); const auto &data_var_names = node->Op()->Output("Out"); InsertDataBalanceOp(&result, data_var_names); } else { - CreateComputationalOps(&result, node.get(), places_.size()); + CreateComputationalOps(&result, node, places_.size()); } if (!is_forwarding && places_.size() > 1) { @@ -322,17 +369,17 @@ std::unique_ptr MultiDevSSAGraphBuilder::Apply( } } } - /* - Dependency graph has been constructed. However, there are still data - hazards need to be handled. - */ + Dependency graph has been constructed. However, there are still data + hazards need to be handled. + */ PolishGraphToSupportDataHazards(&result); /* * Only variables should be the leaves of graph. */ AddOutputToLeafOps(&result); + PADDLE_ENFORCE(!ir::HasCircle(result)); return graph; } @@ -357,7 +404,7 @@ void MultiDevSSAGraphBuilder::SetCommunicationContext( #endif } -void MultiDevSSAGraphBuilder::CreateBroadcastOp(Graph *result, +void MultiDevSSAGraphBuilder::CreateBroadcastOp(ir::Graph *result, const std::string &p_name, size_t src_dev_id) const { #ifdef PADDLE_WITH_CUDA @@ -387,7 +434,7 @@ void MultiDevSSAGraphBuilder::CreateBroadcastOp(Graph *result, } } -void MultiDevSSAGraphBuilder::CreateComputationalOp(Graph *result, +void MultiDevSSAGraphBuilder::CreateComputationalOp(ir::Graph *result, ir::Node *node, int dev_id) const { result->Get("ops").emplace_back( @@ -396,7 +443,7 @@ void MultiDevSSAGraphBuilder::CreateComputationalOp(Graph *result, CreateOpHandleIOs(result, node, dev_id); } -void MultiDevSSAGraphBuilder::InsertAllReduceOp(Graph *result, +void MultiDevSSAGraphBuilder::InsertAllReduceOp(ir::Graph *result, const std::string &og) const { #ifdef PADDLE_WITH_CUDA result->Get("ops").emplace_back(new AllReduceOpHandle( @@ -426,7 +473,7 @@ void MultiDevSSAGraphBuilder::InsertAllReduceOp(Graph *result, } void MultiDevSSAGraphBuilder::InsertDataBalanceOp( - Graph *result, const std::vector &datas) const { + ir::Graph *result, const std::vector &datas) const { #ifdef PADDLE_WITH_CUDA result->Get("ops").emplace_back(new DataBalanceOpHandle( result->CreateEmptyNode("data_balance", ir::Node::Type::kOperation), @@ -479,8 +526,8 @@ int MultiDevSSAGraphBuilder::GetOpDeviceID(ir::Node *node) const { PADDLE_ENFORCE_EQ(param_grad.size(), 2U); int dev_id = GetVarDeviceID(param_grad[1]); - PADDLE_ENFORCE_NE(dev_id, -1, "dev_id should not be -1.[%s, %s]", - node->Op()->Type(), param_grad[0]); + PADDLE_ENFORCE_NE(dev_id, -1, "dev_id should not be -1.[%s, %s, %s]", + node->Op()->Type(), param_grad[0], param_grad[1]); return dev_id; } @@ -489,7 +536,7 @@ int MultiDevSSAGraphBuilder::GetVarDeviceID(const std::string &varname) const { return got == var_name_on_devices_.end() ? -1 : got->second; } -void MultiDevSSAGraphBuilder::CreateScaleLossGradOp(Graph *result) const { +void MultiDevSSAGraphBuilder::CreateScaleLossGradOp(ir::Graph *result) const { for (size_t i = 0; i < places_.size(); ++i) { // Insert ScaleCost OpHandle #ifdef PADDLE_WITH_CUDA @@ -519,7 +566,7 @@ void MultiDevSSAGraphBuilder::CreateScaleLossGradOp(Graph *result) const { } } -void MultiDevSSAGraphBuilder::CreateComputationalOps(Graph *result, +void MultiDevSSAGraphBuilder::CreateComputationalOps(ir::Graph *result, ir::Node *node, size_t num_places) const { for (size_t scope_idx = 0; scope_idx < num_places; ++scope_idx) { @@ -531,7 +578,7 @@ void MultiDevSSAGraphBuilder::CreateComputationalOps(Graph *result, } } -VarHandle *MultiDevSSAGraphBuilder::CreateReduceOp(Graph *result, +VarHandle *MultiDevSSAGraphBuilder::CreateReduceOp(ir::Graph *result, const std::string &og, int dst_dev_id) const { #ifdef PADDLE_WITH_CUDA @@ -564,12 +611,11 @@ VarHandle *MultiDevSSAGraphBuilder::CreateReduceOp(Graph *result, // Find the first occurence of `prev_op_name` and make current `op` depend // on it. -void MultiDevSSAGraphBuilder::ConnectOp(Graph *result, OpHandleBase *op, +void MultiDevSSAGraphBuilder::ConnectOp(ir::Graph *result, OpHandleBase *op, const std::string &prev_op_name) const { for (auto &prev_op : result->Get("ops")) { if (prev_op->Name() == prev_op_name) { - auto *dep_var = new DummyVarHandle( - result->CreateEmptyNode("dummy", ir::Node::Type::kVariable)); + auto *dep_var = new DummyVarHandle(result->CreateControlDepVar()); prev_op->AddOutput(dep_var); result->Get("dep_vars").emplace(dep_var); op->AddInput(dep_var); @@ -577,7 +623,7 @@ void MultiDevSSAGraphBuilder::ConnectOp(Graph *result, OpHandleBase *op, } } -void MultiDevSSAGraphBuilder::CreateDistTrainOp(Graph *result, +void MultiDevSSAGraphBuilder::CreateDistTrainOp(ir::Graph *result, ir::Node *node) const { int op_dev_id = -1; std::vector input_var_names; @@ -591,6 +637,7 @@ void MultiDevSSAGraphBuilder::CreateDistTrainOp(Graph *result, if (node->Op()->Type() == "split_byref" || node->Op()->Type() == "split_selected_rows") { + // TODO(paddle-dev): getting the first var is not safe. op_dev_id = GetVarDeviceID(input_var_names[0]); if (strategy_.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce) { op_dev_id = GetAppropriateDeviceID(input_var_names); @@ -624,10 +671,14 @@ void MultiDevSSAGraphBuilder::CreateDistTrainOp(Graph *result, } // Create RPC related op handles that connects its in ops and out ops. -void MultiDevSSAGraphBuilder::CreateRPCOp(Graph *result, ir::Node *node) const { +void MultiDevSSAGraphBuilder::CreateRPCOp(ir::Graph *result, + ir::Node *node) const { int op_dev_id = -1; if (node->Op()->Type() == "send") { + // TODO(paddle-dev): getting the first var is not safe. op_dev_id = GetVarDeviceID(node->inputs[0]->Name()); + PADDLE_ENFORCE(!ir::IsControlDepVar(*node->inputs[0]), + "This hack no longer holds, please fix."); // the variable name which contains .block means it was splited by // split_byref op // so that we can balance the variable blocks to all the pserver diff --git a/paddle/fluid/framework/details/multi_devices_graph_builder.h b/paddle/fluid/framework/details/multi_devices_graph_builder.h index 2b7f4f586b4e750fde9245286c977258a9db6086..55076f227b5ab56d66b5053173c9e915da23b15f 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_builder.h +++ b/paddle/fluid/framework/details/multi_devices_graph_builder.h @@ -46,11 +46,13 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder { const std::vector &local_scopes, const BuildStrategy &strategy); #endif - std::unique_ptr Apply(std::unique_ptr graph) const override; + std::unique_ptr Apply( + std::unique_ptr graph) const override; int GetVarDeviceID(const std::string &varname) const override; private: - void CreateOpHandleIOs(Graph *result, ir::Node *node, size_t device_id) const; + void CreateOpHandleIOs(ir::Graph *result, ir::Node *node, + size_t device_id) const; private: std::string loss_var_name_; @@ -64,8 +66,8 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder { bool IsScaleLossOp(ir::Node *node) const; - void CreateRPCOp(Graph *result, ir::Node *node) const; - void CreateDistTrainOp(Graph *result, ir::Node *node) const; + void CreateRPCOp(ir::Graph *result, ir::Node *node) const; + void CreateDistTrainOp(ir::Graph *result, ir::Node *node) const; /** * Is this operator as the end-point operator before/after send operator. @@ -74,21 +76,22 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder { const std::vector &recv_vars) const; std::vector FindDistTrainSendVars( - const std::vector> &nodes) const; + const std::vector &nodes) const; std::vector FindDistTrainRecvVars( - const std::vector> &nodes) const; + const std::vector &nodes) const; - void ConnectOp(Graph *result, OpHandleBase *op, + void ConnectOp(ir::Graph *result, OpHandleBase *op, const std::string &prev_op_name) const; - void CreateComputationalOps(Graph *result, ir::Node *node, + void CreateComputationalOps(ir::Graph *result, ir::Node *node, size_t num_places) const; - void CreateScaleLossGradOp(Graph *result) const; - VarHandle *CreateReduceOp(Graph *result, const std::string &og, + void CreateScaleLossGradOp(ir::Graph *result) const; + VarHandle *CreateReduceOp(ir::Graph *result, const std::string &og, int dst_dev_id) const; - void CreateComputationalOp(Graph *result, ir::Node *node, int dev_id) const; + void CreateComputationalOp(ir::Graph *result, ir::Node *node, + int dev_id) const; bool IsParameterGradientOnce( const std::string &og, @@ -96,12 +99,12 @@ class MultiDevSSAGraphBuilder : public SSAGraphBuilder { int GetOpDeviceID(ir::Node *node) const; - void InsertAllReduceOp(Graph *result, const std::string &og) const; + void InsertAllReduceOp(ir::Graph *result, const std::string &og) const; - void InsertDataBalanceOp(Graph *result, + void InsertDataBalanceOp(ir::Graph *result, const std::vector &datas) const; - void CreateBroadcastOp(Graph *result, const std::string &p_name, + void CreateBroadcastOp(ir::Graph *result, const std::string &p_name, size_t src_dev_id) const; bool IsSparseGradient(const std::string &og) const; diff --git a/paddle/fluid/framework/details/rpc_op_handle.cc b/paddle/fluid/framework/details/rpc_op_handle.cc index 924ff4d118a192a43e5828a38fd1abbaac1a8526..f44b374edb29228dff5a8bf003d945291f166d49 100644 --- a/paddle/fluid/framework/details/rpc_op_handle.cc +++ b/paddle/fluid/framework/details/rpc_op_handle.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "paddle/fluid/framework/details/rpc_op_handle.h" +#include "paddle/fluid/framework/ir/graph.h" namespace paddle { namespace framework { @@ -33,7 +34,7 @@ void RPCOpHandle::RunImpl() { for (auto *in : inputs_) { auto &p = static_cast(in)->place_; // FIXME(Yancey1989): need a better solution instead of use DebugString() - if (in->DebugString() == "dummy") { // HACK + if (ir::IsControlDepVar(*in->Node())) { // HACK continue; } if (in->GeneratedOp()) { diff --git a/paddle/fluid/framework/details/ssa_graph_builder.cc b/paddle/fluid/framework/details/ssa_graph_builder.cc index 7bc130ef6e8d2e0caf6e445d12950b87e6dd4dbd..506e7eb35cd977869424223cb863dd64dbaa9d30 100644 --- a/paddle/fluid/framework/details/ssa_graph_builder.cc +++ b/paddle/fluid/framework/details/ssa_graph_builder.cc @@ -17,7 +17,7 @@ namespace paddle { namespace framework { namespace details { -void SSAGraphBuilder::PolishGraphToSupportDataHazards(Graph *graph) { +void SSAGraphBuilder::PolishGraphToSupportDataHazards(ir::Graph *graph) { for (auto &var_map : graph->Get("vars")) { for (auto &name_pair : var_map) { if (name_pair.second.size() <= 1) { @@ -36,9 +36,18 @@ void SSAGraphBuilder::PolishGraphToSupportDataHazards(Graph *graph) { // Read Write is the same op. continue; } + bool has_dep = false; + for (auto *r_out : read_op->Outputs()) { + for (auto *w_in : write_op->Inputs()) { + if (r_out->Node() == w_in->Node()) { + has_dep = true; + break; + } + } + } + if (has_dep) continue; - auto *dep_var = new DummyVarHandle( - graph->CreateEmptyNode("dummy", ir::Node::Type::kVariable)); + auto *dep_var = new DummyVarHandle(graph->CreateControlDepVar()); read_op->AddOutput(dep_var); write_op->AddInput(dep_var); graph->Get("dep_vars").emplace(dep_var); @@ -49,7 +58,7 @@ void SSAGraphBuilder::PolishGraphToSupportDataHazards(Graph *graph) { } VarHandle *SSAGraphBuilder::CreateOrGetLatestVarHandle( - Graph *graph, ir::Node *node, const platform::Place &place, + ir::Graph *graph, ir::Node *node, const platform::Place &place, size_t place_offset) { auto &var_holders = graph->Get("vars")[place_offset]; auto &var_holder = var_holders[node->Name()]; @@ -70,7 +79,7 @@ VarHandle *SSAGraphBuilder::CreateOrGetLatestVarHandle( return var; } -void SSAGraphBuilder::CreateOpOutput(Graph *graph, OpHandleBase *op_handle, +void SSAGraphBuilder::CreateOpOutput(ir::Graph *graph, OpHandleBase *op_handle, ir::Node *new_node, const platform::Place &place, size_t place_offset) { @@ -82,13 +91,12 @@ void SSAGraphBuilder::CreateOpOutput(Graph *graph, OpHandleBase *op_handle, op_handle->AddOutput(var); } -void SSAGraphBuilder::AddOutputToLeafOps(Graph *graph) { +void SSAGraphBuilder::AddOutputToLeafOps(ir::Graph *graph) { for (auto &op : graph->Get("ops")) { if (!op->Outputs().empty()) { continue; } - auto *dummy_leaf = new DummyVarHandle( - graph->CreateEmptyNode("dummy", ir::Node::Type::kVariable)); + auto *dummy_leaf = new DummyVarHandle(graph->CreateControlDepVar()); graph->Get("dep_vars").emplace(dummy_leaf); op->AddOutput(dummy_leaf); } diff --git a/paddle/fluid/framework/details/ssa_graph_builder.h b/paddle/fluid/framework/details/ssa_graph_builder.h index e8e8acdb38f893302fb92c47d6f1cb2d38453e0f..2b4f31f2ff3444f909e3be5eb810ae6737e237b2 100644 --- a/paddle/fluid/framework/details/ssa_graph_builder.h +++ b/paddle/fluid/framework/details/ssa_graph_builder.h @@ -57,26 +57,23 @@ class SSAGraphBuilder : public ir::Pass { DISABLE_COPY_AND_ASSIGN(SSAGraphBuilder); protected: - /** - * We only handle write after read(WAR), since it should not have a write - * after write in program. If there are write after write operators, we need - * prune them. - * - * https://en.wikipedia.org/wiki/Hazard_(computer_architecture)#Write_after_read_(WAR) - */ - static void PolishGraphToSupportDataHazards(Graph *graph); - - static VarHandle *CreateOrGetLatestVarHandle(Graph *graph, ir::Node *node, + /* + Dependency graph has been constructed. However, there are still data + hazards need to be handled. + */ + static void PolishGraphToSupportDataHazards(ir::Graph *graph); + + static VarHandle *CreateOrGetLatestVarHandle(ir::Graph *graph, ir::Node *node, const platform::Place &place, size_t place_offset); // Add an output variable (each_var_name, place, place_offset) to op_handle, // which belongs to graph - static void CreateOpOutput(Graph *graph, OpHandleBase *op_handle, + static void CreateOpOutput(ir::Graph *graph, OpHandleBase *op_handle, ir::Node *new_node, const platform::Place &place, size_t place_offset); - static void AddOutputToLeafOps(Graph *graph); + static void AddOutputToLeafOps(ir::Graph *graph); }; } // namespace details } // namespace framework diff --git a/paddle/fluid/framework/details/ssa_graph_checker.cc b/paddle/fluid/framework/details/ssa_graph_checker.cc index 7c79d7f1e881c67514634d56caa715c41927dbce..0438b096109a287366610d06ef2bd14c765a8f43 100644 --- a/paddle/fluid/framework/details/ssa_graph_checker.cc +++ b/paddle/fluid/framework/details/ssa_graph_checker.cc @@ -20,7 +20,7 @@ namespace paddle { namespace framework { namespace details { -bool SSAGraghBuilderWithChecker::IsValidGraph(const Graph *graph) const { +bool SSAGraghBuilderWithChecker::IsValidGraph(const ir::Graph *graph) const { std::unordered_map pending_ops; std::unordered_set pending_vars; std::unordered_set ready_vars; diff --git a/paddle/fluid/framework/details/ssa_graph_checker.h b/paddle/fluid/framework/details/ssa_graph_checker.h index 2e397e86825a41765a360d31fa8595d17027f3ec..51ce6e5ecad755613551aa6525b5cfbe4a8933ef 100644 --- a/paddle/fluid/framework/details/ssa_graph_checker.h +++ b/paddle/fluid/framework/details/ssa_graph_checker.h @@ -28,7 +28,8 @@ class SSAGraghBuilderWithChecker : public SSAGraphBuilder { std::unique_ptr&& builder) : builder_(std::move(builder)) {} - std::unique_ptr Apply(std::unique_ptr graph) const override { + std::unique_ptr Apply( + std::unique_ptr graph) const override { auto new_graph = builder_->Apply(std::move(graph)); PADDLE_ENFORCE(IsValidGraph(new_graph.get())); return new_graph; @@ -38,7 +39,7 @@ class SSAGraghBuilderWithChecker : public SSAGraphBuilder { return builder_->GetVarDeviceID(var_name); } - bool IsValidGraph(const Graph* graph) const; + bool IsValidGraph(const ir::Graph* graph) const; private: std::unique_ptr builder_; diff --git a/paddle/fluid/framework/details/ssa_graph_printer.cc b/paddle/fluid/framework/details/ssa_graph_printer.cc index 6dd6fd262e35a192ba85eb3aa16660526d2ebca2..20aab1464400aa9bb1bd6af11c06269c688a8308 100644 --- a/paddle/fluid/framework/details/ssa_graph_printer.cc +++ b/paddle/fluid/framework/details/ssa_graph_printer.cc @@ -21,7 +21,7 @@ namespace framework { namespace details { template -static inline void IterAllVar(const Graph &graph, Callback callback) { +static inline void IterAllVar(const ir::Graph &graph, Callback callback) { for (auto &each : graph.Get("vars")) { for (auto &pair1 : each) { for (auto &pair2 : pair1.second) { @@ -35,7 +35,7 @@ static inline void IterAllVar(const Graph &graph, Callback callback) { } } -void GraphvizSSAGraphPrinter::Print(const Graph &graph, +void GraphvizSSAGraphPrinter::Print(const ir::Graph &graph, std::ostream &sout) const { size_t var_id = 0; std::unordered_map vars; diff --git a/paddle/fluid/framework/details/ssa_graph_printer.h b/paddle/fluid/framework/details/ssa_graph_printer.h index cd72162f44ca76aa6340606cf79a73601eae89af..a77c1bad3f15bca9064ded860696eb68b033b090 100644 --- a/paddle/fluid/framework/details/ssa_graph_printer.h +++ b/paddle/fluid/framework/details/ssa_graph_printer.h @@ -25,12 +25,12 @@ namespace details { class SSAGraphPrinter { public: virtual ~SSAGraphPrinter() {} - virtual void Print(const Graph& graph, std::ostream& sout) const = 0; + virtual void Print(const ir::Graph& graph, std::ostream& sout) const = 0; }; class GraphvizSSAGraphPrinter : public SSAGraphPrinter { public: - void Print(const Graph& graph, std::ostream& sout) const override; + void Print(const ir::Graph& graph, std::ostream& sout) const override; }; class SSAGraghBuilderWithPrinter : public SSAGraphBuilder { @@ -50,7 +50,8 @@ class SSAGraghBuilderWithPrinter : public SSAGraphBuilder { stream_ptr_(std::move(sout)), stream_ref_(*stream_ptr_) {} - std::unique_ptr Apply(std::unique_ptr graph) const override { + std::unique_ptr Apply( + std::unique_ptr graph) const override { auto new_graph = builder_->Apply(std::move(graph)); printer_->Print(*new_graph, stream_ref_); return new_graph; diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc index f85c62dd6c4a8033a037b1e001ece6a9cc90ca98..c19f74476f9a1498a7d61f5faf204e9966aea155 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc @@ -21,7 +21,8 @@ namespace framework { namespace details { ThreadedSSAGraphExecutor::ThreadedSSAGraphExecutor( const ExecutionStrategy &strategy, const std::vector &local_scopes, - const std::vector &places, std::unique_ptr &&graph) + const std::vector &places, + std::unique_ptr &&graph) : graph_(std::move(graph)), pool_(strategy.num_threads_ >= 2 ? new ::ThreadPool(strategy.num_threads_) : nullptr), diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h index bf7c0a367a19ff4ac9462334516f1577672faa68..3d67daa45e20fdea52689684397ad01f2f4cd783 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h @@ -40,7 +40,7 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { ThreadedSSAGraphExecutor(const ExecutionStrategy &strategy, const std::vector &local_scopes, const std::vector &places, - std::unique_ptr &&graph); + std::unique_ptr &&graph); // Run a SSAGraph by a thread pool // Use topological sort algorithm @@ -53,7 +53,7 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { details::OpHandleBase *op); private: - std::unique_ptr graph_; + std::unique_ptr graph_; std::unique_ptr<::ThreadPool> pool_; std::vector local_scopes_; std::vector places_; diff --git a/paddle/fluid/framework/details/var_handle.cc b/paddle/fluid/framework/details/var_handle.cc index 6f00abd9473a84a77ed1a39015e2ae079e00be79..5457870e9ff5d7cf67c9c7076b9aae94eeada779 100644 --- a/paddle/fluid/framework/details/var_handle.cc +++ b/paddle/fluid/framework/details/var_handle.cc @@ -26,7 +26,7 @@ std::string VarHandle::DebugString() const { return ss.str(); } -std::string DummyVarHandle::DebugString() const { return "dummy"; } +std::string DummyVarHandle::DebugString() const { return node_->Name(); } } // namespace details } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/executor.cc b/paddle/fluid/framework/executor.cc index 84f67fafa19ac545ebb7a1019059e3c74c363c56..c2800c972a5501859672fbfd6921499e84d09cb0 100644 --- a/paddle/fluid/framework/executor.cc +++ b/paddle/fluid/framework/executor.cc @@ -45,19 +45,13 @@ ExecutorPrepareContext::~ExecutorPrepareContext() { Executor::Executor(const platform::Place& place) : place_(place) {} +void Executor::Close() { #ifdef PADDLE_WITH_DISTRIBUTE -void Executor::BeginPass() { ::paddle::operators::distributed::RPCClient::GetInstance< ::paddle::operators::distributed::GRPCClient>() - ->SendBeginPass(); -} - -void Executor::EndPass() { - ::paddle::operators::distributed::RPCClient::GetInstance< - ::paddle::operators::distributed::GRPCClient>() - ->SendEndPass(); -} + ->SendComplete(); #endif +} void InitializeVariable(Variable* var, proto::VarType::Type var_type) { if (var_type == proto::VarType::LOD_TENSOR) { diff --git a/paddle/fluid/framework/executor.h b/paddle/fluid/framework/executor.h index 563a4b2bb65dad481a755f67c7f23939816ce8e8..214ca3dc492c31d4c683790a6ae051be467401c9 100644 --- a/paddle/fluid/framework/executor.h +++ b/paddle/fluid/framework/executor.h @@ -44,17 +44,11 @@ class Executor { explicit Executor(const platform::Place& place); -#ifdef PADDLE_WITH_DISTRIBUTE /* - * Sending signal to pserver to mark current pass started. + * Close this Executor. + * Calling this method will send complete messages to all pserver instances. */ - void BeginPass(); - - /* - * Sending signal to pserver to mark current pass finished. - */ - void EndPass(); -#endif + void Close(); /* @Brief * Runtime evaluation of the given ProgramDesc under certain Scope diff --git a/paddle/fluid/framework/ir/CMakeLists.txt b/paddle/fluid/framework/ir/CMakeLists.txt index ee0604383ec9df826fa2abaef1f643ba0da6a096..6447452ae58344273fe569c91168c7c95a901c8d 100644 --- a/paddle/fluid/framework/ir/CMakeLists.txt +++ b/paddle/fluid/framework/ir/CMakeLists.txt @@ -1,5 +1,6 @@ cc_library(node SRCS node.cc DEPS proto_desc) cc_library(graph SRCS graph.cc DEPS node) +cc_library(graph_helper SRCS graph_helper.cc DEPS graph) cc_library(pass SRCS pass.cc DEPS graph node) - -cc_test(graph_test SRCS graph_test.cc DEPS graph proto_desc op_registry) +cc_test(graph_test SRCS graph_test.cc DEPS graph op_registry) +cc_test(graph_helper_test SRCS graph_helper_test.cc DEPS graph_helper op_registry) diff --git a/paddle/fluid/framework/ir/graph.cc b/paddle/fluid/framework/ir/graph.cc index e4021aa92b6da2343b604fb7bc01d31edb97d842..740acfafb7594d8d9f3ca5439323ce76c5ed271a 100644 --- a/paddle/fluid/framework/ir/graph.cc +++ b/paddle/fluid/framework/ir/graph.cc @@ -12,14 +12,18 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ +#include +#include + #include "paddle/fluid/framework/ir/graph.h" +#include "paddle/fluid/framework/op_proto_maker.h" #include "paddle/fluid/framework/program_desc.h" #include "paddle/fluid/framework/var_desc.h" namespace paddle { namespace framework { +namespace ir { -// NOTE(paddle-dev): This graph contains circle. Graph::Graph(const ProgramDesc &program) : program_(program) { VLOG(3) << "block in program:" << program_.Size(); std::unordered_map all_vars; @@ -27,40 +31,87 @@ Graph::Graph(const ProgramDesc &program) : program_(program) { all_vars.emplace(var->Name(), var); } - std::map var_nodes; + std::map> var_nodes; for (auto *op : program.Block(0).AllOps()) { ir::Node *node = CreateOpNode(op); - + // For input args, reuse the same var name if it was created before. + // Otherwise, create a new one. for (auto &each_var_name : op->InputArgumentNames()) { ir::Node *var = nullptr; if (var_nodes.find(each_var_name) != var_nodes.end()) { - var = var_nodes.at(each_var_name); + var = var_nodes.at(each_var_name).back(); } else if (all_vars.count(each_var_name) != 0) { var = CreateVarNode(all_vars.at(each_var_name)); - var_nodes[each_var_name] = var; + var_nodes[each_var_name].push_back(var); } else { - // TODO(paddle-dev): Seems some assumption doesn't hold? - VLOG(3) << op->Type() - << " input var not in all_var list: " << each_var_name; + // Operation input var can be optional (dispensable). Which means + // the operation doesn't really need the var at runtime. In this + // case, the no-existed var is ready at the beginning. var = CreateEmptyNode(each_var_name, ir::Node::Type::kVariable); - var_nodes[each_var_name] = var; + var_nodes[each_var_name].push_back(var); } node->inputs.push_back(var); var->outputs.push_back(node); } - + // For output args, always create a new var. for (auto &each_var_name : op->OutputArgumentNames()) { - ir::Node *var = nullptr; - if (var_nodes.find(each_var_name) != var_nodes.end()) { - var = var_nodes.at(each_var_name); - } else { - var = CreateVarNode(all_vars.at(each_var_name)); - var_nodes[each_var_name] = var; - } + ir::Node *var = CreateVarNode(all_vars.at(each_var_name)); + var_nodes[each_var_name].push_back(var); node->outputs.push_back(var); var->inputs.push_back(node); } } + /** + * We only handle write after read(WAR), since it should not have a write + * after write in program. If there are write after write operators, we need + * prune them. + * + * https://en.wikipedia.org/wiki/Hazard_(computer_architecture)#Write_after_read_(WAR) + */ + + for (auto &var : var_nodes) { + auto &versions = var.second; + if (versions.size() <= 1) continue; + + auto it_new = versions.rbegin(); + auto it_old = versions.rbegin(); + ++it_old; + for (; it_old != versions.rend(); it_new = it_old, ++it_old) { + ir::Node *write_op = + (*it_new)->inputs.empty() ? nullptr : (*it_new)->inputs[0]; + const auto &read_ops = (*it_old)->outputs; + + for (auto *read_op : read_ops) { + // Manually add a dependency var from read_op to write_op; + if (read_op == write_op) { + // Read Write is the same op. + continue; + } + // 2 ops might have been connected via other vars. + bool has_dep = false; + for (ir::Node *r_out : read_op->outputs) { + for (ir::Node *w_in : write_op->inputs) { + if (r_out == w_in) { + has_dep = true; + break; + } + } + } + if (has_dep) continue; + + ir::Node *dep_var = CreateControlDepVar(); + read_op->outputs.push_back(dep_var); + dep_var->inputs.push_back(read_op); + write_op->inputs.push_back(dep_var); + dep_var->outputs.push_back(write_op); + } + } + } +} + +bool IsControlDepVar(const ir::Node &var) { + return var.Name().find(ir::Node::kControlDepVarName) != std::string::npos; } +} // namespace ir } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/ir/graph.h b/paddle/fluid/framework/ir/graph.h index b4ac135b029005b723abca2cb9b9a9aa175eda40..4f59ec82a7d1217621c95d9a4a433a9af43e95da 100644 --- a/paddle/fluid/framework/ir/graph.h +++ b/paddle/fluid/framework/ir/graph.h @@ -26,13 +26,14 @@ limitations under the License. */ namespace paddle { namespace framework { +namespace ir { class Graph { public: - explicit Graph(const ProgramDesc& program); + explicit Graph(const ProgramDesc &program); virtual ~Graph() { - for (auto& attr : attrs_) { + for (auto &attr : attrs_) { attr_dels_[attr.first](); } attrs_.clear(); @@ -40,12 +41,12 @@ class Graph { } template - AttrType& Get(const std::string& attr_name) const { - return *boost::any_cast(attrs_.at(attr_name)); + AttrType &Get(const std::string &attr_name) const { + return *boost::any_cast(attrs_.at(attr_name)); } template - void Set(const std::string& attr_name, AttrType* attr) { + void Set(const std::string &attr_name, AttrType *attr) { PADDLE_ENFORCE(attrs_.count(attr_name) == 0); attrs_[attr_name] = attr; attr_dels_[attr_name] = [attr, attr_name]() { @@ -54,29 +55,70 @@ class Graph { }; } - ir::Node* CreateVarNode(VarDesc* var_desc) { - nodes.emplace_back(new ir::Node(var_desc)); - return nodes.back().get(); + const std::unordered_set &Nodes() const { return node_set_; } + + // Create a normal variable with non-null VarDesc. + ir::Node *CreateVarNode(VarDesc *var_desc) { + return AddNode(new ir::Node(var_desc)); + } + + // Create a normal runnable operator with OpDesc. + ir::Node *CreateOpNode(OpDesc *op_desc) { + return AddNode(new ir::Node(op_desc)); } - ir::Node* CreateOpNode(OpDesc* op_desc) { - nodes.emplace_back(new ir::Node(op_desc)); - return nodes.back().get(); + // Create a control dependency var that connects 2 operations. The + // var doesn't hold any data. Other than that, it's no different from + // other var, considering dependency analysis. + ir::Node *CreateControlDepVar() { + // TODO(panyx0718): control var name should be really unique. + const std::string name = string::Sprintf( + "%s@%llu", ir::Node::kControlDepVarName, node_set_.size()); + return AddNode(new ir::Node(name, ir::Node::Type::kVariable)); } - ir::Node* CreateEmptyNode(const std::string& name, ir::Node::Type type) { - nodes.emplace_back(new ir::Node(name, type)); - return nodes.back().get(); + // A more free style way of creating a graph node. Mostly use for test + // or "copy" from another node. Avoid using it if possible. + ir::Node *CreateEmptyNode(const std::string &name, ir::Node::Type type) { + return AddNode(new ir::Node(name, type)); } - std::vector> nodes; + // Clear all node information of the graph and return the ownership of the + // nodes. + std::vector> ReleaseNodes() { + std::vector> ret; + for (auto &n : nodes_) { + ret.emplace_back(n.second.release()); + } + nodes_.clear(); + node_set_.clear(); + return ret; + } private: + // This method takes ownership of `node`. + ir::Node *AddNode(ir::Node *node) { + PADDLE_ENFORCE(node_set_.find(node) == node_set_.end()); + nodes_[node].reset(node); + node_set_.insert(node); + return node; + } + + void RemoveNode(ir::Node *node) { + PADDLE_ENFORCE(node_set_.find(node) != node_set_.end()); + node_set_.erase(node); + nodes_.erase(node); + } + // NOTE: program_ shouldn't be exposed to user. - const ProgramDesc& program_; + const ProgramDesc &program_; std::map attrs_; std::map> attr_dels_; + std::map> nodes_; + std::unordered_set node_set_; }; +bool IsControlDepVar(const ir::Node &var); +} // namespace ir } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/ir/graph_helper.cc b/paddle/fluid/framework/ir/graph_helper.cc new file mode 100644 index 0000000000000000000000000000000000000000..b1c19e6535150130822e9f48685241e62de5b064 --- /dev/null +++ b/paddle/fluid/framework/ir/graph_helper.cc @@ -0,0 +1,118 @@ +/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include +#include + +#include "paddle/fluid/framework/ir/graph_helper.h" + +namespace paddle { +namespace framework { +namespace ir { +namespace { +void SortHelper( + const std::map> &adj_list, + ir::Node *node, std::unordered_set *visited, + std::vector *ret) { + visited->insert(node); + + for (auto adj : adj_list.at(node)) { + if (visited->find(adj) == visited->end()) { + SortHelper(adj_list, adj, visited, ret); + } + } + + VLOG(3) << "topology sort insert: " << node->Name() + << reinterpret_cast(node) << " input " << node->inputs.size(); + ret->push_back(node); +} + +bool HasCircleHelper( + ir::Node *node, + const std::map> &adj_list, + std::unordered_set *visited, + std::unordered_set *in_trace) { + if (visited->find(node) == visited->end()) { + visited->insert(node); + in_trace->insert(node); + + for (ir::Node *in : adj_list.at(node)) { + if (visited->find(in) == visited->end() && + HasCircleHelper(in, adj_list, visited, in_trace)) { + return true; + } else if (in_trace->find(in) != in_trace->end()) { + return true; + } + } + } + in_trace->erase(node); + return false; +} + +bool HasCircleInternal( + const std::map> &adj_list) { + std::unordered_set visited; + std::unordered_set in_trace; + for (auto &adj : adj_list) { + if (HasCircleHelper(adj.first, adj_list, &visited, &in_trace)) { + return true; + } + } + return false; +} +} // namespace + +bool HasCircle(const Graph &graph) { + return HasCircleInternal(BuildOperationAdjList(graph)); +} + +std::vector TopologySortOperations(const Graph &graph) { + std::map> adj_list = + BuildOperationAdjList(graph); + PADDLE_ENFORCE(!HasCircleInternal(adj_list)); + std::unordered_set visited; + std::vector ret; + for (auto adj : adj_list) { + if (visited.find(adj.first) == visited.end()) { + SortHelper(adj_list, adj.first, &visited, &ret); + } + } + return ret; +} + +std::map> BuildOperationAdjList( + const Graph &graph) { + std::map> adj_list; + + for (auto &n : graph.Nodes()) { + if (n->NodeType() != ir::Node::Type::kOperation) continue; + if (adj_list.find(n) == adj_list.end()) { + adj_list[n] = std::unordered_set(); + } + for (auto &var : n->inputs) { + for (auto &adj_n : var->inputs) { + PADDLE_ENFORCE(adj_n->NodeType() == ir::Node::Type::kOperation); + adj_list[n].insert(adj_n); + VLOG(3) << "adj " << adj_n->Name() << reinterpret_cast(adj_n) + << " -> " << n->Name() << reinterpret_cast(n) + << " via " << var->Name() << reinterpret_cast(var); + } + } + } + return adj_list; +} + +} // namespace ir +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/ir/graph_helper.h b/paddle/fluid/framework/ir/graph_helper.h new file mode 100644 index 0000000000000000000000000000000000000000..cd6c53a07f8f56781989739d995226bd02b3d3d0 --- /dev/null +++ b/paddle/fluid/framework/ir/graph_helper.h @@ -0,0 +1,40 @@ +/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#pragma once + +#include +#include +#include + +#include "paddle/fluid/framework/ir/graph.h" +#include "paddle/fluid/framework/ir/node.h" + +namespace paddle { +namespace framework { +namespace ir { +// Test if the graph contains circle. +bool HasCircle(const Graph &graph); + +// Topology Sort the operations in the graph from inputs to outputs. +// `graph` cannot contain circle. +std::vector TopologySortOperations(const Graph &graph); + +// Build an adjacency list of operations for the `graph`. +std::map> BuildOperationAdjList( + const Graph &graph); + +} // namespace ir +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/ir/graph_helper_test.cc b/paddle/fluid/framework/ir/graph_helper_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..b517442bb73f43bc1cb1d639b6c6cf004b28d4cf --- /dev/null +++ b/paddle/fluid/framework/ir/graph_helper_test.cc @@ -0,0 +1,125 @@ +/* Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. */ + +#include "paddle/fluid/framework/ir/graph.h" +#include +#include "gtest/gtest.h" +#include "paddle/fluid/framework/ir/graph_helper.h" +#include "paddle/fluid/framework/program_desc.h" + +namespace paddle { +namespace framework { +namespace ir { + +void BuildCircleGraph(Graph* g) { + ir::Node* o1 = g->CreateEmptyNode("op1", Node::Type::kOperation); + ir::Node* v1 = g->CreateEmptyNode("var1", Node::Type::kVariable); + + o1->outputs.push_back(v1); + o1->inputs.push_back(v1); + v1->inputs.push_back(o1); + v1->outputs.push_back(o1); +} + +void BuildCircleGraph2(Graph* g) { + ir::Node* o1 = g->CreateEmptyNode("op1", Node::Type::kOperation); + ir::Node* o2 = g->CreateEmptyNode("op2", Node::Type::kOperation); + ir::Node* v1 = g->CreateEmptyNode("var1", Node::Type::kVariable); + ir::Node* v2 = g->CreateEmptyNode("var2", Node::Type::kVariable); + + o1->outputs.push_back(v1); + o2->inputs.push_back(v1); + v1->inputs.push_back(o1); + v1->outputs.push_back(o2); + + o2->outputs.push_back(v2); + o1->inputs.push_back(v2); + v2->inputs.push_back(o2); + v2->outputs.push_back(o1); +} + +void BuildNoCircleGraph(Graph* g) { + ir::Node* o1 = g->CreateEmptyNode("op1", Node::Type::kOperation); + ir::Node* o2 = g->CreateEmptyNode("op2", Node::Type::kOperation); + ir::Node* o3 = g->CreateEmptyNode("op3", Node::Type::kOperation); + ir::Node* o4 = g->CreateEmptyNode("op4", Node::Type::kOperation); + ir::Node* o5 = g->CreateEmptyNode("op5", Node::Type::kOperation); + ir::Node* v1 = g->CreateEmptyNode("var1", Node::Type::kVariable); + ir::Node* v2 = g->CreateEmptyNode("var2", Node::Type::kVariable); + ir::Node* v3 = g->CreateEmptyNode("var3", Node::Type::kVariable); + ir::Node* v4 = g->CreateEmptyNode("var4", Node::Type::kVariable); + + // o1->v1->o2 + o1->outputs.push_back(v1); + o2->inputs.push_back(v1); + v1->inputs.push_back(o1); + v1->outputs.push_back(o2); + // o2->v2->o3 + // o2->v2->o4 + o2->outputs.push_back(v2); + o3->inputs.push_back(v2); + o4->inputs.push_back(v2); + v2->inputs.push_back(o2); + v2->outputs.push_back(o3); + v2->outputs.push_back(o4); + // o2->v3->o5 + o2->outputs.push_back(v3); + o5->inputs.push_back(v3); + v3->inputs.push_back(o2); + v3->outputs.push_back(o5); + // o3-v4->o5 + o3->outputs.push_back(v4); + o5->inputs.push_back(v4); + v4->inputs.push_back(o3); + v4->outputs.push_back(o5); +} + +TEST(GraphHelperTest, Basic) { + ProgramDesc prog; + + Graph g(prog); + BuildCircleGraph(&g); + ASSERT_TRUE(HasCircle(g)); + + Graph g2(prog); + BuildCircleGraph2(&g2); + ASSERT_TRUE(HasCircle(g2)); + + auto adj_list = BuildOperationAdjList(g2); + for (auto& adj : adj_list) { + auto& adj_set = adj.second; + if (adj.first->Name() == "op1") { + ASSERT_EQ((*adj_set.begin())->Name(), "op2"); + } else if (adj.first->Name() == "op2") { + ASSERT_EQ((*adj_set.begin())->Name(), "op1"); + } else { + ASSERT_TRUE(false); + } + } + + Graph g3(prog); + BuildNoCircleGraph(&g3); + ASSERT_FALSE(HasCircle(g3)); + auto sorted = TopologySortOperations(g3); + std::map node_map; + for (size_t i = 0; i < sorted.size(); ++i) { + node_map[sorted[i]->Name()] = i; + } + ASSERT_EQ(node_map.at("op1"), 0); + ASSERT_EQ(node_map.at("op2"), 1); + ASSERT_TRUE(node_map.at("op3") < node_map.at("op5")); +} +} // namespace ir +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/ir/graph_test.cc b/paddle/fluid/framework/ir/graph_test.cc index 4e23bf124f8822e25be0f6b1c7c8c5de4e4f600a..73ef55756c330bdbc3be89c436967b2a88625a43 100644 --- a/paddle/fluid/framework/ir/graph_test.cc +++ b/paddle/fluid/framework/ir/graph_test.cc @@ -76,6 +76,7 @@ TEST(GraphTest, Basic) { op->SetType("sum"); op->SetInput("X", {"test_a", "test_b", "test_c"}); op->SetOutput("Out", {"test_out"}); + op->SetAttr("op_role", 1); prog.MutableBlock(0)->Var("test_a")->SetType(proto::VarType::SELECTED_ROWS); prog.MutableBlock(0)->Var("test_b")->SetType(proto::VarType::SELECTED_ROWS); @@ -92,21 +93,22 @@ TEST(GraphTest, Basic) { ASSERT_EQ(proto::VarType::LOD_TENSOR, prog.MutableBlock(0)->Var("test_out")->GetType()); - std::unique_ptr g(new Graph(prog)); - ASSERT_EQ(g->nodes[0]->Name(), "sum"); - ASSERT_EQ(g->nodes[0]->inputs[0]->Name(), "test_a"); - ASSERT_EQ(g->nodes[0]->inputs[1]->Name(), "test_b"); - ASSERT_EQ(g->nodes[0]->inputs[2]->Name(), "test_c"); - ASSERT_EQ(g->nodes[0]->outputs[0]->Name(), "test_out"); - ASSERT_EQ(g->nodes[1]->Name(), "test_a"); - ASSERT_EQ(g->nodes[1]->outputs[0]->Name(), "sum"); - ASSERT_EQ(g->nodes[2]->Name(), "test_b"); - ASSERT_EQ(g->nodes[2]->outputs[0]->Name(), "sum"); - ASSERT_EQ(g->nodes[3]->Name(), "test_c"); - ASSERT_EQ(g->nodes[3]->outputs[0]->Name(), "sum"); - ASSERT_EQ(g->nodes[4]->Name(), "test_out"); - ASSERT_EQ(g->nodes[4]->inputs[0]->Name(), "sum"); - ASSERT_EQ(g->nodes.size(), 5); + std::unique_ptr g(new ir::Graph(prog)); + std::vector nodes(g->Nodes().begin(), g->Nodes().end()); + for (ir::Node *n : nodes) { + if (n->Name() == "sum") { + ASSERT_EQ(n->inputs.size(), 3); + ASSERT_EQ(n->outputs.size(), 1); + } else if (n->Name() == "test_a" || n->Name() == "test_b" || + n->Name() == "test_c") { + ASSERT_EQ(n->inputs.size(), 0); + ASSERT_EQ(n->outputs.size(), 1); + } else if (n->Name() == "test_out") { + ASSERT_EQ(n->inputs.size(), 1); + ASSERT_EQ(n->outputs.size(), 0); + } + } + ASSERT_EQ(nodes.size(), 5); } } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/ir/node.cc b/paddle/fluid/framework/ir/node.cc index 86376e7e8bc8bee2ddbc18f7f24bcdd849a06cbf..aca77da8d674f29b89c023717cdcd061232d023a 100644 --- a/paddle/fluid/framework/ir/node.cc +++ b/paddle/fluid/framework/ir/node.cc @@ -15,5 +15,9 @@ limitations under the License. */ #include "paddle/fluid/framework/ir/node.h" namespace paddle { -namespace framework {} // namespace framework +namespace framework { +namespace ir { +const char Node::kControlDepVarName[] = "__control_var"; +} // namespace ir +} // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/ir/node.h b/paddle/fluid/framework/ir/node.h index b98c29b81ddc2f57553b8fe76fcfeb0936ddd837..b3138fccee86fb274abe72007961fc1c982b1e96 100644 --- a/paddle/fluid/framework/ir/node.h +++ b/paddle/fluid/framework/ir/node.h @@ -27,6 +27,8 @@ namespace ir { class Node { public: enum class Type { kOperation, kVariable }; + static const char kControlDepVarName[]; + explicit Node(const std::string& name, Type type) : name_(name), var_desc_(nullptr), op_desc_(nullptr), type_(type) {} @@ -50,6 +52,7 @@ class Node { PADDLE_ENFORCE(type_ == Type::kVariable); return var_desc_; } + OpDesc* Op() { PADDLE_ENFORCE(type_ == Type::kOperation); return op_desc_; diff --git a/paddle/fluid/framework/mixed_vector.h b/paddle/fluid/framework/mixed_vector.h index 71bebeea637a7eb6e3bfddc0b2b641477b06bcdf..7836ecb1272a07a79a70c9cb040335f9a42e5684 100644 --- a/paddle/fluid/framework/mixed_vector.h +++ b/paddle/fluid/framework/mixed_vector.h @@ -16,6 +16,7 @@ #include #include +#include #include #include "paddle/fluid/framework/tensor.h" @@ -386,13 +387,14 @@ template class CPUVector : public std::vector> { public: CPUVector() : std::vector() {} - CPUVector(size_t count, const T &value = T()) + CPUVector(size_t count, const T &value = T()) // NOLINT : std::vector(count, value) {} CPUVector(std::initializer_list init) : std::vector(init) {} - CPUVector(const std::vector &other) : std::vector(other) {} - explicit CPUVector(const CPUVector &other) : std::vector(other) {} + CPUVector(const std::vector &other) : std::vector(other) {} // NOLINT + CPUVector(const CPUVector &other) : std::vector(other) {} CPUVector(CPUVector &&other) : std::vector(std::move(other)) {} - CPUVector(std::vector &&other) : std::vector(std::move(other)) {} + CPUVector(std::vector &&other) // NOLINT + : std::vector(std::move(other)) {} CPUVector &operator=(const CPUVector &other) { this->assign(other.begin(), other.end()); return *this; @@ -410,8 +412,6 @@ class CPUVector : public std::vector> { return os; } - void resize(size_t size) { this->resize(size); } - T &operator[](size_t id) { return this->at(id); } const T &operator[](size_t id) const { return this->at(id); } diff --git a/paddle/fluid/framework/mixed_vector_test.cc b/paddle/fluid/framework/mixed_vector_test.cc new file mode 100644 index 0000000000000000000000000000000000000000..0599c8d384641606b0a5ebb5ba1781b56f539e63 --- /dev/null +++ b/paddle/fluid/framework/mixed_vector_test.cc @@ -0,0 +1,72 @@ +/* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. */ + +#include + +#include "glog/logging.h" +#include "gtest/gtest.h" +#include "paddle/fluid/framework/mixed_vector.h" + +template +using vec = paddle::framework::Vector; + +TEST(mixed_vector, CPU_VECTOR) { + vec tmp; + for (int i = 0; i < 10; ++i) { + tmp.push_back(i); + } + ASSERT_EQ(tmp.size(), 10UL); + vec tmp2; + tmp2 = tmp; + ASSERT_EQ(tmp2.size(), 10UL); + for (int i = 0; i < 10; ++i) { + ASSERT_EQ(tmp2[i], i); + ASSERT_EQ(tmp2[i], tmp[i]); + } + int cnt = 0; + for (auto& t : tmp2) { + ASSERT_EQ(t, cnt); + ++cnt; + } +} + +TEST(mixed_vector, InitWithCount) { + paddle::framework::Vector vec(10, 10); + for (int i = 0; i < 10; ++i) { + ASSERT_EQ(vec[i], 10); + } +} + +TEST(mixed_vector, ForEach) { + vec tmp; + for (auto& v : tmp) { + VLOG(3) << v; + } +} + +TEST(mixed_vector, Reserve) { + paddle::framework::Vector vec; + vec.reserve(1); + vec.push_back(0); + vec.push_back(0); + vec.push_back(0); +} + +TEST(mixed_vector, Resize) { + paddle::framework::Vector vec; + vec.resize(1); + vec.push_back(0); + vec.push_back(0); + vec.push_back(0); +} diff --git a/paddle/fluid/framework/mixed_vector_test.cu b/paddle/fluid/framework/mixed_vector_test.cu index d57f82510833d6a0cea7009cf1f0b49543812f8d..4b0caa8d350dde0462e5fdcca743df919358a364 100644 --- a/paddle/fluid/framework/mixed_vector_test.cu +++ b/paddle/fluid/framework/mixed_vector_test.cu @@ -11,7 +11,9 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ + #include +#include #include "glog/logging.h" #include "gtest/gtest.h" @@ -21,26 +23,6 @@ template using vec = paddle::framework::Vector; -TEST(mixed_vector, CPU_VECTOR) { - vec tmp; - for (int i = 0; i < 10; ++i) { - tmp.push_back(i); - } - ASSERT_EQ(tmp.size(), 10UL); - vec tmp2; - tmp2 = tmp; - ASSERT_EQ(tmp2.size(), 10UL); - for (int i = 0; i < 10; ++i) { - ASSERT_EQ(tmp2[i], i); - ASSERT_EQ(tmp2[i], tmp[i]); - } - int cnt = 0; - for (auto& t : tmp2) { - ASSERT_EQ(t, cnt); - ++cnt; - } -} - static __global__ void multiply_10(int* ptr) { for (int i = 0; i < 10; ++i) { ptr[i] *= 10; @@ -91,24 +73,3 @@ TEST(mixed_vector, MultiGPU) { ASSERT_EQ(tmp[i], i * 100); } } - -TEST(mixed_vector, InitWithCount) { - paddle::framework::Vector vec(10, 10); - for (int i = 0; i < 10; ++i) { - ASSERT_EQ(vec[i], 10); - } -} - -TEST(mixed_vector, ForEach) { - vec tmp; - for (auto& v : tmp) { - } -} - -TEST(mixed_vector, Reserve) { - paddle::framework::Vector vec; - vec.reserve(1); - vec.push_back(0); - vec.push_back(0); - vec.push_back(0); -} diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 1e5bba62b53025dacdbf2d74b35f266cf4e422c2..02c836bea194553bb9c4bc5677cc408dd302e9ce 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -132,7 +132,7 @@ ParallelExecutor::ParallelExecutor( #endif } builder_ = builder_factory.Create(); - std::unique_ptr graph(new Graph(main_program)); + std::unique_ptr graph(new ir::Graph(main_program)); graph = builder_->Apply(std::move(graph)); member_->executor_.reset(new details::ThreadedSSAGraphExecutor( exec_strategy, member_->local_scopes_, places, std::move(graph))); diff --git a/paddle/fluid/inference/api/api_impl.cc b/paddle/fluid/inference/api/api_impl.cc index 3ae255e13fc4f3ca28a6af62a5d5944d84303fc7..58fd7c6f8b05a846bd4a82068f09f5d9ef5a6516 100644 --- a/paddle/fluid/inference/api/api_impl.cc +++ b/paddle/fluid/inference/api/api_impl.cc @@ -137,6 +137,7 @@ bool NativePaddlePredictor::Run(const std::vector &inputs, executor_->RunPreparedContext( ctx_.get(), sub_scope_ != nullptr ? sub_scope_ : scope_.get(), &feed_targets, &fetch_targets, + false, /* don't create local scope each time*/ false /* don't create variable eatch time */); VLOG(4) << "Finish prepared context"; if (!GetFetch(fetchs, output_data)) { diff --git a/paddle/fluid/inference/tests/test_helper.h b/paddle/fluid/inference/tests/test_helper.h index 44c36b1683b037832a218df02184e7cd2ba143e9..695790a37dce889e838462b401ca4e89f09271d5 100644 --- a/paddle/fluid/inference/tests/test_helper.h +++ b/paddle/fluid/inference/tests/test_helper.h @@ -210,13 +210,14 @@ void TestInference(const std::string& dirname, // Ignore the profiling results of the first run std::unique_ptr ctx; + bool CreateLocalScope = CreateVars; if (PrepareContext) { ctx = executor.Prepare(*inference_program, 0); executor.RunPreparedContext(ctx.get(), scope, &feed_targets, - &fetch_targets, true, CreateVars); + &fetch_targets, CreateLocalScope, CreateVars); } else { executor.Run(*inference_program, scope, &feed_targets, &fetch_targets, - true, CreateVars); + CreateLocalScope, CreateVars); } // Enable the profiler @@ -232,10 +233,11 @@ void TestInference(const std::string& dirname, // Note: if you change the inference_program, you need to call // executor.Prepare() again to get a new ExecutorPrepareContext. executor.RunPreparedContext(ctx.get(), scope, &feed_targets, - &fetch_targets, CreateVars); + &fetch_targets, CreateLocalScope, + CreateVars); } else { executor.Run(*inference_program, scope, &feed_targets, &fetch_targets, - CreateVars); + CreateLocalScope, CreateVars); } } diff --git a/paddle/fluid/operators/CMakeLists.txt b/paddle/fluid/operators/CMakeLists.txt index 4e2002ad24415437ae4f85eba0e90a6c689e2996..9b56ad4c55e35d497aa7abe4e1da3867a2084b88 100644 --- a/paddle/fluid/operators/CMakeLists.txt +++ b/paddle/fluid/operators/CMakeLists.txt @@ -192,9 +192,9 @@ if(WITH_DISTRIBUTE) set(DISTRIBUTE_DEPS "") if(WITH_GRPC) - set(DISTRIBUTE_DEPS sendrecvop_grpc grpc++_unsecure grpc_unsecure gpr cares zlib protobuf) + set(DISTRIBUTE_DEPS sendrecvop_grpc grpc++_unsecure grpc_unsecure gpr cares zlib protobuf node) else() - set(DISTRIBUTE_DEPS sendrecvop_brpc brpc leveldb snappystream snappy protobuf ssl crypto zlib) + set(DISTRIBUTE_DEPS sendrecvop_brpc brpc leveldb snappystream snappy protobuf ssl crypto zlib node) if(WITH_BRPC_RDMA) find_library(IBVERBS_LIBRARY NAMES ibverbs) ADD_LIBRARY(ibverbs SHARED IMPORTED GLOBAL) diff --git a/paddle/fluid/operators/distributed/CMakeLists.txt b/paddle/fluid/operators/distributed/CMakeLists.txt index 6555b8101a90bba8351d2c82313ab12e572a01ee..1612927055dd4ec5ee2220bc2b285e8d9b640ea8 100644 --- a/paddle/fluid/operators/distributed/CMakeLists.txt +++ b/paddle/fluid/operators/distributed/CMakeLists.txt @@ -18,7 +18,7 @@ if(WITH_GRPC) set_source_files_properties(grpc_serde_test.cc rpc_server_test.cc PROPERTIES COMPILE_FLAGS ${DISTRIBUTE_COMPILE_FLAGS}) cc_test(grpc_serde_test SRCS grpc_serde_test.cc DEPS grpc++_unsecure grpc_unsecure gpr cares zlib protobuf sendrecvop_grpc scope profiler math_function SERIAL) - cc_test(grpc_server_test SRCS rpc_server_test.cc + cc_test(rpc_server_test SRCS rpc_server_test.cc DEPS sendrecvop_grpc grpc++_unsecure grpc_unsecure gpr cares zlib protobuf executor proto_desc lookup_table_op SERIAL) return() endif() diff --git a/paddle/fluid/operators/distributed/grpc_client.cc b/paddle/fluid/operators/distributed/grpc_client.cc index 52c4bc1e7965323438de959d5eb1f3b4ef4f4cfe..265f964ddc682868c64669744b130aebbbf86692 100644 --- a/paddle/fluid/operators/distributed/grpc_client.cc +++ b/paddle/fluid/operators/distributed/grpc_client.cc @@ -36,20 +36,16 @@ void GRPCClient::InitEventLoop() { client_thread_.reset(new std::thread(std::bind(&GRPCClient::Proceed, this))); } -void GRPCClient::SendBeginPass() { - for (auto& it : channels_) { - VLOG(3) << "send begin pass to: " << it.first; - this->AsyncSendBeginPass(it.first); - } - this->Wait(); -} - -void GRPCClient::SendEndPass() { - for (auto& it : channels_) { - VLOG(3) << "send end pass to " << it.first; - this->AsyncSendEndPass(it.first); +void GRPCClient::SendComplete() { + std::unique_lock lk(completed_mutex_); + if (!completed_) { + for (auto& it : channels_) { + VLOG(3) << "send complete message to " << it.first; + this->AsyncSendComplete(it.first); + } + PADDLE_ENFORCE(this->Wait(), "internal grpc error"); + completed_ = true; } - this->Wait(); } GRPCClient::~GRPCClient() { @@ -239,32 +235,19 @@ void GRPCClient::AsyncSendFetchBarrier(const std::string& ep, req_count_++; } -void GRPCClient::AsyncSendBeginPass(const std::string& ep, int64_t time_out) { +void GRPCClient::AsyncSendComplete(const std::string& ep, int64_t time_out) { const auto ch = GetChannel(ep); BatchBarrierProcessor* s = new BatchBarrierProcessor(ch); s->Prepare(time_out); sendrecv::VariableMessage req; - req.set_varname(BEGIN_PASS_MESSAGE); + req.set_varname(COMPLETE_MESSAGE); auto rpc = s->stub_->AsyncSendVariable(s->context_.get(), req, &cq_); rpc->Finish(&s->reply_, &s->status_, reinterpret_cast(s)); req_count_++; } -void GRPCClient::AsyncSendEndPass(const std::string& ep, int64_t time_out) { - const auto ch = GetChannel(ep); - - FetchBarrierProcessor* s = new FetchBarrierProcessor(ch); - s->Prepare(time_out); - - sendrecv::VariableMessage req; - req.set_varname(END_PASS_MESSAGE); - auto rpc = s->stub_->AsyncGetVariable(s->context_.get(), req, &cq_); - rpc->Finish(&s->reply_, &s->status_, reinterpret_cast(s)); - req_count_++; -} - void GRPCClient::AsyncCheckpointNotify(const std::string& ep, const std::string& dir, int64_t time_out) { diff --git a/paddle/fluid/operators/distributed/grpc_client.h b/paddle/fluid/operators/distributed/grpc_client.h index 11de84d9e265b2ca75d6d72a1d1e8797763f96a5..8351d825f817437e1b3691e916952dd9a86af491 100644 --- a/paddle/fluid/operators/distributed/grpc_client.h +++ b/paddle/fluid/operators/distributed/grpc_client.h @@ -174,7 +174,7 @@ class CheckpointNotifyProcessor : public BaseProcessor { class GRPCClient : public RPCClient { public: - GRPCClient() : ok_(true) {} + GRPCClient() : ok_(true), completed_(false) {} virtual ~GRPCClient(); bool AsyncSendVar(const std::string& ep, const platform::DeviceContext& ctx, @@ -201,17 +201,12 @@ class GRPCClient : public RPCClient { void AsyncCheckpointNotify(const std::string& ep, const std::string& dir, int64_t time_out = FLAGS_rpc_deadline) override; - void AsyncSendBeginPass(const std::string& ep, - int64_t time_out = FLAGS_rpc_deadline) override; - - void AsyncSendEndPass(const std::string& ep, - int64_t time_out = FLAGS_rpc_deadline) override; + void AsyncSendComplete(const std::string& ep, + int64_t time_out = FLAGS_rpc_deadline) override; bool Wait() override; - void SendBeginPass() override; - - void SendEndPass() override; + void SendComplete() override; protected: void InitImpl() override; @@ -238,6 +233,10 @@ class GRPCClient : public RPCClient { // mutex for GetChannel thread safety std::mutex chan_mutex_; DISABLE_COPY_AND_ASSIGN(GRPCClient); + + // mutex for sending complete message only once + std::mutex completed_mutex_; + bool completed_; }; } // namespace distributed diff --git a/paddle/fluid/operators/distributed/request_handler.h b/paddle/fluid/operators/distributed/request_handler.h index 3d61171dff98d6752be98b4b90577bfd059525ab..64ac7281848f91302bc0aa3cb81dd198e56fb653 100644 --- a/paddle/fluid/operators/distributed/request_handler.h +++ b/paddle/fluid/operators/distributed/request_handler.h @@ -43,8 +43,6 @@ constexpr char kRequestPassBarrier[] = "RequestPassBarrier"; #define BATCH_BARRIER_MESSAGE "BATCH_BARRIER@RECV" #define FETCH_BARRIER_MESSAGE "FETCH_BARRIER@RECV" #define COMPLETE_MESSAGE "COMPLETE@RECV" -#define BEGIN_PASS_MESSAGE "BEGIN_PASS@RECV" -#define END_PASS_MESSAGE "END_PASS@RECV" #define CHECKPOINT_SAVE_MESSAGE "SAVE@CHECKPOINTNOTIFY" #define CHECKPOINT_LOAD_MESSAGE "LOAD@CHECKPOINTNOTIFY" diff --git a/paddle/fluid/operators/distributed/request_handler_impl.cc b/paddle/fluid/operators/distributed/request_handler_impl.cc index f1f84072d47e58eaa81dd66dc018e17b182bb57b..55995783c6eab10632ab2a5bca64ca856f000df1 100644 --- a/paddle/fluid/operators/distributed/request_handler_impl.cc +++ b/paddle/fluid/operators/distributed/request_handler_impl.cc @@ -55,10 +55,9 @@ bool RequestSendHandler::Handle(const std::string& varname, if (varname == BATCH_BARRIER_MESSAGE) { VLOG(3) << "sync: recv BATCH_BARRIER_MESSAGE"; rpc_server_->IncreaseBatchBarrier(kRequestSend); - } else if (varname == BEGIN_PASS_MESSAGE) { - VLOG(3) << "sync: recv begin pass message"; - rpc_server_->WaitCond(kRequestSend); - rpc_server_->BeginPass(); + } else if (varname == COMPLETE_MESSAGE) { + VLOG(3) << "sync: recv complete message"; + rpc_server_->Complete(); } else { VLOG(3) << "sync: received var_name: " << varname; rpc_server_->WaitCond(kRequestSend); @@ -94,14 +93,12 @@ bool RequestGetHandler::Handle(const std::string& varname, if (varname == FETCH_BARRIER_MESSAGE) { VLOG(3) << "sync: recv fetch barrier message"; rpc_server_->IncreaseBatchBarrier(kRequestGet); - } else if (varname == END_PASS_MESSAGE) { - rpc_server_->EndPass(); } else { rpc_server_->WaitCond(kRequestGet); *outvar = scope_->FindVar(varname); } } else { - if (varname != FETCH_BARRIER_MESSAGE && varname != END_PASS_MESSAGE) { + if (varname != FETCH_BARRIER_MESSAGE && varname != COMPLETE_MESSAGE) { *outvar = scope_->FindVar(varname); } } diff --git a/paddle/fluid/operators/distributed/rpc_client.h b/paddle/fluid/operators/distributed/rpc_client.h index 4d87376fbf776e29156b78d826f5012bc53460df..22a022a5d25e5c6628b80294494b87ca105a04c7 100644 --- a/paddle/fluid/operators/distributed/rpc_client.h +++ b/paddle/fluid/operators/distributed/rpc_client.h @@ -60,17 +60,13 @@ class RPCClient { const std::string& dir, int64_t time_out = FLAGS_rpc_deadline) = 0; - virtual void AsyncSendBeginPass(const std::string& ep, - int64_t time_out = FLAGS_rpc_deadline) = 0; + virtual void AsyncSendComplete(const std::string& ep, + int64_t time_out = FLAGS_rpc_deadline) = 0; - virtual void AsyncSendEndPass(const std::string& ep, - int64_t time_out = FLAGS_rpc_deadline) = 0; - - // BeginePass/EndPass tells all the pserver that start/end a pass, so that - // the pserver can increase/reduce it's barrier count, and continue to train + // Complete tells all the pserver instances that finishe the training, + // the pserver can reduce it's barrier count, and continue to train // with other trainers. - virtual void SendBeginPass() = 0; - virtual void SendEndPass() = 0; + virtual void SendComplete() = 0; virtual bool Wait() = 0; diff --git a/paddle/fluid/operators/distributed/rpc_server.cc b/paddle/fluid/operators/distributed/rpc_server.cc index d49ee34eeaf4e80f6fd4f8cdc548cc2b938d0f2a..83b14fa64d735d80f43bf55c798cddb2f3ea7032 100644 --- a/paddle/fluid/operators/distributed/rpc_server.cc +++ b/paddle/fluid/operators/distributed/rpc_server.cc @@ -64,18 +64,7 @@ void RPCServer::IncreaseBatchBarrier(const std::string rpc_name) { } } -void RPCServer::BeginPass() { - VLOG(4) << "RPCServer begin increase pass barrier"; - { - std::unique_lock lock(mutex_); - client_num_++; - VLOG(4) << "increase client_num to: " << client_num_; - } - barrier_cond_.notify_all(); -} - -void RPCServer::EndPass() { - VLOG(4) << "RPCServer begin increase pass barrier"; +void RPCServer::Complete() { { std::unique_lock lock(mutex_); client_num_--; @@ -87,6 +76,11 @@ void RPCServer::EndPass() { barrier_cond_.notify_all(); } +int RPCServer::GetClientNum() { + std::unique_lock lock(mutex_); + return client_num_; +} + void RPCServer::ResetBarrierCounter() { VLOG(3) << "RPCServer ResetBarrierCounter "; std::unique_lock lock(mutex_); diff --git a/paddle/fluid/operators/distributed/rpc_server.h b/paddle/fluid/operators/distributed/rpc_server.h index 833991c8aa6e7cfd10f2aa52f9218be7ff8ccebf..fd914d7a72e61bc9472876c433b65598ef5b1980 100644 --- a/paddle/fluid/operators/distributed/rpc_server.h +++ b/paddle/fluid/operators/distributed/rpc_server.h @@ -44,7 +44,7 @@ class RPCServer { int GetSelectedPort() const { return selected_port_; } - int GetClientNum() const; + int GetClientNum(); void SavePort() const; @@ -64,8 +64,7 @@ class RPCServer { void WaitCond(const std::string& rpc_name); void IncreaseBatchBarrier(const std::string rpc_name); - void BeginPass(); - void EndPass(); + void Complete(); void ResetBarrierCounter(); diff --git a/paddle/fluid/operators/distributed/rpc_server_test.cc b/paddle/fluid/operators/distributed/rpc_server_test.cc index a0693cffabcc561b0adfafc2c49027a890dd5efc..9f2360ec70d2ce5d4e16435595e109c1bf04fd13 100644 --- a/paddle/fluid/operators/distributed/rpc_server_test.cc +++ b/paddle/fluid/operators/distributed/rpc_server_test.cc @@ -91,7 +91,7 @@ void InitTensorsOnServer(framework::Scope* scope, platform::CPUPlace* place, } } -void StartServer() { +void StartServer(const std::string& rpc_name) { framework::ProgramDesc program; framework::Scope scope; platform::CPUPlace place; @@ -107,14 +107,14 @@ void StartServer() { std::shared_ptr> prefetch_var_name_to_prepared; prefetch_var_name_to_prepared[in_var_name] = prepared[0]; + g_req_handler->SetProgram(&program); g_req_handler->SetPrefetchPreparedCtx(&prefetch_var_name_to_prepared); g_req_handler->SetDevCtx(&ctx); g_req_handler->SetScope(&scope); g_req_handler->SetExecutor(&exe); - g_rpc_service->RegisterRPC(distributed::kRequestPrefetch, - g_req_handler.get()); + g_rpc_service->RegisterRPC(rpc_name, g_req_handler.get()); g_req_handler->SetRPCServer(g_rpc_service.get()); std::thread server_thread( @@ -129,7 +129,7 @@ TEST(PREFETCH, CPU) { distributed::RPCClient* client = distributed::RPCClient::GetInstance(); - std::thread server_thread(StartServer); + std::thread server_thread(StartServer, distributed::kRequestPrefetch); g_rpc_service->WaitServerReady(); int port = g_rpc_service->GetSelectedPort(); @@ -162,3 +162,24 @@ TEST(PREFETCH, CPU) { g_rpc_service.reset(nullptr); g_req_handler.reset(nullptr); } + +TEST(COMPLETE, CPU) { + g_req_handler.reset(new distributed::RequestSendHandler(true)); + g_rpc_service.reset(new RPCSERVER_T("127.0.0.1:0", 2)); + distributed::RPCClient* client = + distributed::RPCClient::GetInstance(); + PADDLE_ENFORCE(client != nullptr); + std::thread server_thread(StartServer, distributed::kRequestSend); + g_rpc_service->WaitServerReady(); + int port = g_rpc_service->GetSelectedPort(); + std::string ep = paddle::string::Sprintf("127.0.0.1:%d", port); + client->AsyncSendComplete(ep); + client->Wait(); + + EXPECT_EQ(g_rpc_service->GetClientNum(), 1); + + g_rpc_service->ShutDown(); + server_thread.join(); + g_rpc_service.reset(nullptr); + g_req_handler.reset(nullptr); +} diff --git a/paddle/fluid/operators/math/im2col.cc b/paddle/fluid/operators/math/im2col.cc index a50b9ace39249f4f899a46e171bbdced033b46bc..bb55ce21b0599bcff4db138a46c9c700f6e52422 100644 --- a/paddle/fluid/operators/math/im2col.cc +++ b/paddle/fluid/operators/math/im2col.cc @@ -40,22 +40,47 @@ class Im2ColFunctordims()[1]; int filter_width = col->dims()[2]; - int col_height = col->dims()[3]; - int col_width = col->dims()[4]; + int output_height = col->dims()[3]; + int output_width = col->dims()[4]; int channels_col = im_channels * filter_height * filter_width; const T* im_data = im.data(); T* col_data = col->data(); + // TODO(TJ): change me to template + // further optimaze: + // 1. padding != 1 + // 2. could also support stride_h != 1 + if (stride[0] == 1 && stride[1] == 1 && dilation[0] == 1 && + dilation[1] == 1 && padding[0] == 0 && padding[1] == 0) { + int col_matrix_width = output_width * output_height; + size_t copy_size = sizeof(T) * output_width; + for (int oh = 0; oh < output_height; ++oh) { + const T* im_data_start = im_data + oh * im_width; + T* dst_data = col_data + oh * output_width; + for (int ic = 0; ic < im_channels; ++ic) { + const T* src_data = im_data_start + ic * im_height * im_width; + for (int kh = 0; kh < filter_height; ++kh) { + for (int kw = 0; kw < filter_width; ++kw) { + std::memcpy(dst_data, src_data + kw, copy_size); + dst_data = dst_data + col_matrix_width; + } + src_data = src_data + im_width; + } + } + } + return; + } + for (int c = 0; c < channels_col; ++c) { int w_offset = c % filter_width; int h_offset = (c / filter_width) % filter_height; int c_im = c / (filter_width * filter_height); - for (int h = 0; h < col_height; ++h) { + for (int h = 0; h < output_height; ++h) { int im_row_idx = h * stride[0] - padding[0] + h_offset * dilation[0]; - for (int w = 0; w < col_width; ++w) { + for (int w = 0; w < output_width; ++w) { int im_col_idx = w * stride[1] - padding[1] + w_offset * dilation[1]; - int col_idx = (c * col_height + h) * col_width + w; + int col_idx = (c * output_height + h) * output_width + w; int im_idx = (im_row_idx + c_im * im_height) * im_width + im_col_idx; col_data[col_idx] = (im_row_idx < 0 || im_row_idx >= im_height || diff --git a/paddle/fluid/operators/math/im2col_test.cc b/paddle/fluid/operators/math/im2col_test.cc index 8e3f0f286823c383bb0c44d0e7887040ec9b20a0..db61f68db3e492d98cfa43576fa1900bffc8674d 100644 --- a/paddle/fluid/operators/math/im2col_test.cc +++ b/paddle/fluid/operators/math/im2col_test.cc @@ -160,8 +160,80 @@ void testIm2col() { delete context; } +void testIm2colCPU(int ic, int ih, int iw, int fh, int fw, int ph, int pw) { + paddle::framework::Tensor input; + paddle::framework::Tensor output; + paddle::framework::Tensor ref_output; + std::vector padding({ph, pw}); + std::vector stride({1, 1}); // stride_y, stride_x + std::vector dilation({1, 1}); // dilation_y, dilation_x + int output_height = (ih - fh + padding[0] * 2) / stride[0] + 1; + int output_width = (iw - fw + padding[1] * 2) / stride[1] + 1; + float* input_ptr = + input.mutable_data({ic, ih, iw}, paddle::platform::CPUPlace()); + for (int i = 0; i < input.numel(); ++i) { + input_ptr[i] = static_cast(i + 1); + } + + paddle::platform::CPUPlace place; + paddle::platform::CPUDeviceContext context(place); + output.mutable_data({ic, fh, fw, output_height, output_width}, place); + ref_output.mutable_data({ic, fh, fw, output_height, output_width}, + place); + paddle::operators::math::Im2ColFunctor< + paddle::operators::math::ColFormat::kCFO, + paddle::platform::CPUDeviceContext, float> + im2col; + im2col(context, input, dilation, stride, padding, &output); + auto ref_im2col = [&]( + const paddle::framework::Tensor& im, const std::vector& dilation, + const std::vector& stride, const std::vector& padding, + paddle::framework::Tensor* col) { + int im_channels = im.dims()[0]; + int im_height = im.dims()[1]; + int im_width = im.dims()[2]; + int filter_height = col->dims()[1]; + int filter_width = col->dims()[2]; + int output_height = col->dims()[3]; + int output_width = col->dims()[4]; + int channels_col = im_channels * filter_height * filter_width; + + const float* im_data = im.data(); + float* col_data = col->data(); + for (int c = 0; c < channels_col; ++c) { + int w_offset = c % filter_width; + int h_offset = (c / filter_width) % filter_height; + int c_im = c / (filter_width * filter_height); + for (int h = 0; h < output_height; ++h) { + int im_row_idx = h * stride[0] - padding[0] + h_offset * dilation[0]; + for (int w = 0; w < output_width; ++w) { + int im_col_idx = w * stride[1] - padding[1] + w_offset * dilation[1]; + int col_idx = (c * output_height + h) * output_width + w; + int im_idx = (im_row_idx + c_im * im_height) * im_width + im_col_idx; + col_data[col_idx] = (im_row_idx < 0 || im_row_idx >= im_height || + im_col_idx < 0 || im_col_idx >= im_width) + ? 0.f + : im_data[im_idx]; + } + } + } + }; + + ref_im2col(input, dilation, stride, padding, &ref_output); + + float* out_cfo_ptr = output.data(); + float* out_ref_ptr = ref_output.data(); + for (int i = 0; i < output.numel(); ++i) { + EXPECT_EQ(out_cfo_ptr[i], out_ref_ptr[i]); + } +} + TEST(math, im2col) { testIm2col(); + testIm2colCPU(/*ic*/ 3, /*ih*/ 5, /*iw*/ 5, /*fh*/ 3, /*fw*/ 2, /*ph*/ 0, + /*pw*/ 0); + testIm2colCPU(/*ic*/ 2, /*ih*/ 5, /*iw*/ 4, /*fh*/ 3, /*fw*/ 3, /*ph*/ 1, + /*pw*/ 1); #ifdef PADDLE_WITH_CUDA testIm2col(); diff --git a/paddle/fluid/operators/send_recv_util.h b/paddle/fluid/operators/send_recv_util.h index deab005149027caffa962783df944fad7110382f..dc26c53c64f06ce21856fb5af8f2a5eb3fc75bb7 100644 --- a/paddle/fluid/operators/send_recv_util.h +++ b/paddle/fluid/operators/send_recv_util.h @@ -14,6 +14,7 @@ limitations under the License. */ #pragma once #include +#include "paddle/fluid/framework/ir/node.h" namespace paddle { namespace operators { @@ -22,7 +23,10 @@ inline bool NeedSend(const framework::Scope& scope, const std::string& varname) { // dummy variable is only used in parallel executor to represent // some dependency relationship, we don't need to send/recv it. - if (varname == "dummy") return false; + // TODO(paddle-dev): Why would parallel executor logic leaked into here? + if (varname.find(framework::ir::Node::kControlDepVarName) != + std::string::npos) + return false; auto* var = scope.FindVar(varname); PADDLE_ENFORCE_NOT_NULL(var, "Can not find variable '%s' in the send side.", varname); diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 3e13e7b1ffebf92301df69084b058ca55783e578..ee1c8d46ddfb4f0c09591bb78dc720555dc735b4 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -498,10 +498,7 @@ All parameter, weight, gradient are variables in Paddle. py::class_(m, "Executor") .def(py::init()) -#ifdef PADDLE_WITH_DISTRIBUTE - .def("begin_pass", &Executor::BeginPass) - .def("end_pass", &Executor::EndPass) -#endif + .def("close", &Executor::Close) .def("run", [](Executor &self, const ProgramDesc &prog, Scope *scope, int block_id, bool create_local_scope, bool create_vars) { pybind11::gil_scoped_release release; diff --git a/python/paddle/fluid/executor.py b/python/paddle/fluid/executor.py index f9e600cb4cb252baead87025db0e0db71e8169d2..4178971398c953236bf8de4d5cb6e93d0e33380c 100644 --- a/python/paddle/fluid/executor.py +++ b/python/paddle/fluid/executor.py @@ -247,6 +247,7 @@ class Executor(object): p.set_place(place) self.executor = core.Executor(p) self.program_caches = dict() + self._closed = False def as_lodtensor(self, data): """ @@ -348,11 +349,23 @@ class Executor(object): ] return outs - def begin_pass(self): - self.executor.begin_pass() + def close(self): + """ + Close this executor. - def end_pass(self): - self.executor.end_pass() + You can no long use this executor after calling this method. + For the distributed training, this method would free the resource on PServers related to + the current Trainer. + + Example: + >>> cpu = core.CPUPlace() + >>> exe = Executor(cpu) + >>> ... + >>> exe.close() + """ + if not self._closed: + self.executor.close() + self._closed = True def run(self, program=None, @@ -405,6 +418,10 @@ class Executor(object): >>> feed={'X': x}, >>> fetch_list=[loss.name]) """ + + if self._closed: + raise RuntimeError("Attempted to use a closed Executor") + if feed is None: feed = {} if not isinstance(feed, dict): diff --git a/python/paddle/fluid/io.py b/python/paddle/fluid/io.py index e1d26474e63c8da174bebe3b639f356c2ef655b4..1ec670de07062057ba09e15ac1e4da026d035a53 100644 --- a/python/paddle/fluid/io.py +++ b/python/paddle/fluid/io.py @@ -790,101 +790,3 @@ def get_parameter_value_by_name(name, executor, program=None): program = default_main_program() var = program.global_block().var(name) return get_parameter_value(var, executor) - - -def get_test_program(filelist, program=None, startup_program=None): - """ - Transpile current train program to a program to read test dataset - if the program is using reader ops like "open_files_op". - """ - - def _copy_reader_var_(block, var, new_name=None): - if new_name == None: - new_name = var.name - new_var = block.create_var( - name=str(new_name), type=core.VarDesc.VarType.READER) - new_var.desc.set_shapes(var.desc.shapes()) - new_var.desc.set_dtypes(var.desc.dtypes()) - new_var.persistable = True - return new_var - - def _get_test_reader_name(train_reader_name): - return train_reader_name + "_test" - - def _is_reader_op(op): - block = op.block - if "Out" in op.output_names: - reader_out = block.vars[op.output("Out")[0]] - if reader_out.type == core.VarDesc.VarType.READER: - return True - return False - - if program == None: - program = default_main_program() - if startup_program == None: - startup_program = default_startup_program() - startup_block = startup_program.global_block() - - # 1. find out the orignal reader var name - startup_reader_op_list = [] - - for op in startup_block.ops: - if _is_reader_op(op): - startup_reader_op_list.append(op) - - if len(startup_reader_op_list) == 0: - return program - - root_reader_op = startup_reader_op_list[0] - train_test_reader_map = {} - # 2. add operators to startup to read open and read test data files - for op in startup_reader_op_list: - assert (len(op.output("Out")) == 1) - train_reader_name = op.output("Out")[0] - train_reader = startup_block.vars[train_reader_name] - test_reader = _copy_reader_var_( - startup_block, - train_reader, - new_name=_get_test_reader_name(train_reader_name)) - train_test_reader_map[train_reader.name] = test_reader - - test_op_inputs = {} - for name in op.input_names: - train_arg_names = op.input(name) - test_arg_vars = [] - for arg_name in train_arg_names: - arg_var = train_test_reader_map[ - arg_name] if name == "UnderlyingReader" else startup_block.vars[ - arg_name] - test_arg_vars.append(arg_var) - test_op_inputs[name] = test_arg_vars - - test_op = startup_block.append_op( - type=op.type, - inputs=test_op_inputs, - outputs={'Out': [test_reader]}, - attrs=op.attrs) - # root reader op's filelist attr for read test files - if op.type == root_reader_op.type: - test_op.set_attr("file_names", filelist) - if op.type == "create_multi_pass_reader": - test_op.set_attr("pass_num", 1) - - # 3. rename reader vars in inference program to different name - # to avoid read from train data. - main_block = program.global_block() - for var in main_block.vars.values(): - if var.type == core.VarDesc.VarType.READER: - main_block._rename_var( - str(var.name), str(_get_test_reader_name(var.name))) - - for op in main_block.ops: - if op.type == root_reader_op.type: - test_op.set_attr("file_names", filelist) - if op.type == "create_multi_pass_reader": - test_op.set_attr("pass_num", 1) - - startup_program._sync_with_cpp() - program._sync_with_cpp() - - return program diff --git a/python/paddle/fluid/layers/control_flow.py b/python/paddle/fluid/layers/control_flow.py index 79a11a30d602cb33c2583873e0d0f2e15e0fcb8c..f05ae6d5d1900560e37370121bf64f1fcab14357 100644 --- a/python/paddle/fluid/layers/control_flow.py +++ b/python/paddle/fluid/layers/control_flow.py @@ -23,25 +23,17 @@ from ops import logical_and, logical_not, logical_or import numpy __all__ = [ - 'split_lod_tensor', - 'merge_lod_tensor', 'While', 'Switch', - 'lod_rank_table', - 'max_sequence_len', - 'lod_tensor_to_array', - 'array_to_lod_tensor', 'increment', 'array_write', 'create_array', 'less_than', 'equal', 'array_read', - 'shrink_memory', 'array_length', 'IfElse', 'DynamicRNN', - 'ConditionalBlock', 'StaticRNN', 'reorder_lod_tensor_by_rank', 'ParallelDo', @@ -1457,7 +1449,7 @@ class IfElse(object): if self.status == IfElse.OUT_IF_ELSE_BLOCKS: raise ValueError("input must in true/false blocks") if id(x) not in self.input_table: - parent_block = self.parent_block() + parent_block = self._parent_block() out_true = parent_block.create_var( name=unique_name.generate('ifelse_input' + self.helper.name), dtype=x.dtype) @@ -1483,7 +1475,7 @@ class IfElse(object): else: return out_false - def parent_block(self): + def _parent_block(self): current_block = self.helper.main_program.current_block() return self.helper.main_program.block(current_block.parent_idx) @@ -1499,7 +1491,7 @@ class IfElse(object): out_table = self.output_table[1 if self.status == self.IN_IF_ELSE_TRUE_BLOCKS else 0] - parent_block = self.parent_block() + parent_block = self._parent_block() for each_out in outs: if not isinstance(each_out, Variable): raise TypeError("Each output should be a variable") diff --git a/python/paddle/fluid/tests/demo/text_classification/.gitignore b/python/paddle/fluid/tests/demo/file_reader/.gitignore similarity index 100% rename from python/paddle/fluid/tests/demo/text_classification/.gitignore rename to python/paddle/fluid/tests/demo/file_reader/.gitignore diff --git a/python/paddle/fluid/tests/demo/text_classification/convert_data_to_recordio.py b/python/paddle/fluid/tests/demo/file_reader/convert_data_to_recordio.py similarity index 94% rename from python/paddle/fluid/tests/demo/text_classification/convert_data_to_recordio.py rename to python/paddle/fluid/tests/demo/file_reader/convert_data_to_recordio.py index 8244617711138d590193b2898de5d2f3aeb1e11e..b839e14889884bca8d27586aa8c1d76fba3458c1 100644 --- a/python/paddle/fluid/tests/demo/text_classification/convert_data_to_recordio.py +++ b/python/paddle/fluid/tests/demo/file_reader/convert_data_to_recordio.py @@ -35,7 +35,7 @@ if len(sys.argv) == 1: word_dict = paddle.dataset.imdb.word_dict() else: word_dict = load_vocab(sys.argv[1]) -word_dict[""] = len(word_dict) + word_dict[""] = len(word_dict) print "Dict dim = ", len(word_dict) # input text data @@ -50,7 +50,7 @@ feeder = fluid.DataFeeder(feed_list=[data, label], place=fluid.CPUPlace()) BATCH_SIZE = 128 train_reader = paddle.batch( paddle.reader.shuffle( - paddle.dataset.imdb.train(word_dict), buf_size=10000), + paddle.dataset.imdb.train(word_dict), buf_size=25000), batch_size=BATCH_SIZE) test_reader = paddle.batch( diff --git a/python/paddle/fluid/tests/demo/file_reader/train.py b/python/paddle/fluid/tests/demo/file_reader/train.py new file mode 100644 index 0000000000000000000000000000000000000000..bc3a6dc81d24afec66ed1489aead1cff79a59bca --- /dev/null +++ b/python/paddle/fluid/tests/demo/file_reader/train.py @@ -0,0 +1,138 @@ +# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle.fluid as fluid +import numpy +import sys + +TRAIN_FILES = ['train.recordio'] +TEST_FILES = ['test.recordio'] + +DICT_DIM = 5147 + +# embedding dim +emb_dim = 128 + +# hidden dim +hid_dim = 128 + +# class num +class_dim = 2 + +# epoch num +epoch_num = 10 + + +def build_program(is_train): + file_obj_handle = fluid.layers.io.open_files( + filenames=TRAIN_FILES if is_train else TEST_FILES, + shapes=[[-1, 1], [-1, 1]], + lod_levels=[1, 0], + dtypes=['int64', 'int64']) + + file_obj = fluid.layers.io.double_buffer(file_obj_handle) + + with fluid.unique_name.guard(): + + data, label = fluid.layers.read_file(file_obj) + + emb = fluid.layers.embedding(input=data, size=[DICT_DIM, emb_dim]) + + conv_3 = fluid.nets.sequence_conv_pool( + input=emb, + num_filters=hid_dim, + filter_size=3, + act="tanh", + pool_type="sqrt") + + conv_4 = fluid.nets.sequence_conv_pool( + input=emb, + num_filters=hid_dim, + filter_size=4, + act="tanh", + pool_type="sqrt") + + prediction = fluid.layers.fc(input=[conv_3, conv_4], + size=class_dim, + act="softmax") + + # cross entropy loss + cost = fluid.layers.cross_entropy(input=prediction, label=label) + + # mean loss + avg_cost = fluid.layers.mean(x=cost) + acc = fluid.layers.accuracy(input=prediction, label=label) + + if is_train: + # SGD optimizer + sgd_optimizer = fluid.optimizer.Adagrad(learning_rate=0.001) + sgd_optimizer.minimize(avg_cost) + + return {'loss': avg_cost, 'log': [avg_cost, acc], 'file': file_obj_handle} + + +def main(): + train = fluid.Program() + startup = fluid.Program() + test = fluid.Program() + + with fluid.program_guard(train, startup): + train_args = build_program(is_train=True) + + with fluid.program_guard(test, startup): + test_args = build_program(is_train=False) + + use_cuda = fluid.core.is_compiled_with_cuda() + # startup + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + exe = fluid.Executor(place=place) + exe.run(startup) + + train_exe = fluid.ParallelExecutor( + use_cuda=use_cuda, + loss_name=train_args['loss'].name, + main_program=train) + test_exe = fluid.ParallelExecutor( + use_cuda=use_cuda, main_program=test, share_vars_from=train_exe) + + fetch_var_list = [var.name for var in train_args['log']] + for epoch_id in range(epoch_num): + # train + try: + batch_id = 0 + while True: + loss, acc = map(numpy.array, + train_exe.run(fetch_list=fetch_var_list)) + print 'Train epoch', epoch_id, 'batch', batch_id, 'loss:', loss, 'acc:', acc + batch_id += 1 + except fluid.core.EOFException: + print 'End of epoch', epoch_id + train_args['file'].reset() + + # test + loss = [] + acc = [] + try: + while True: + loss_np, acc_np = map(numpy.array, + test_exe.run(fetch_list=fetch_var_list)) + loss.append(loss_np[0]) + acc.append(acc_np[0]) + except: + test_args['file'].reset() + print 'Test loss:', numpy.mean(loss), 'acc:', numpy.mean(acc) + + +if __name__ == '__main__': + main() diff --git a/python/paddle/fluid/tests/demo/text_classification/train.py b/python/paddle/fluid/tests/demo/text_classification/train.py deleted file mode 100644 index 281c2869d642c7fe41a386c42208ca2da1dc2891..0000000000000000000000000000000000000000 --- a/python/paddle/fluid/tests/demo/text_classification/train.py +++ /dev/null @@ -1,146 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import paddle.fluid as fluid -import numpy -import sys - -TRAIN_FILES = ['train.recordio'] -TEST_FILES = ['test.recordio'] - -DICT_DIM = 89528 - -# embedding dim -emb_dim = 128 - -# hidden dim -hid_dim = 128 - -# hidden dim2 -hid_dim2 = 96 - -# class num -class_dim = 2 - - -def network_cfg(is_train, pass_num=100): - with fluid.unique_name.guard(): - train_file_obj = fluid.layers.open_files( - filenames=TRAIN_FILES, - pass_num=pass_num, - shapes=[[-1, 1], [-1, 1]], - lod_levels=[1, 0], - dtypes=['int64', 'int64']) - - test_file_obj = fluid.layers.open_files( - filenames=TEST_FILES, - pass_num=1, - shapes=[[-1, 1], [-1, 1]], - lod_levels=[1, 0], - dtypes=['int64', 'int64']) - - if is_train: - file_obj = fluid.layers.shuffle(train_file_obj, buffer_size=1000) - else: - file_obj = test_file_obj - - file_obj = fluid.layers.double_buffer( - file_obj, - name="train_double_buffer" if is_train else 'test_double_buffer') - - data, label = fluid.layers.read_file(file_obj) - - emb = fluid.layers.embedding(input=data, size=[DICT_DIM, emb_dim]) - - # sequence conv with window size = 3 - win_size = 3 - conv_3 = fluid.nets.sequence_conv_pool( - input=emb, - num_filters=hid_dim, - filter_size=win_size, - act="tanh", - pool_type="max") - - # fc layer after conv - fc_1 = fluid.layers.fc(input=[conv_3], size=hid_dim2) - - # probability of each class - prediction = fluid.layers.fc(input=[fc_1], - size=class_dim, - act="softmax") - # cross entropy loss - cost = fluid.layers.cross_entropy(input=prediction, label=label) - - # mean loss - avg_cost = fluid.layers.mean(x=cost) - acc = fluid.layers.accuracy(input=prediction, label=label) - - if is_train: - # SGD optimizer - sgd_optimizer = fluid.optimizer.Adagrad(learning_rate=0.01) - sgd_optimizer.minimize(avg_cost) - - return { - 'loss': avg_cost, - 'log': [avg_cost, acc], - 'file': train_file_obj if is_train else test_file_obj - } - - -def main(): - train = fluid.Program() - startup = fluid.Program() - - with fluid.program_guard(train, startup): - train_args = network_cfg(is_train=True) - - test = fluid.Program() - - with fluid.program_guard(test, fluid.Program()): - test_args = network_cfg(is_train=False) - - # startup - place = fluid.CUDAPlace(0) - exe = fluid.Executor(place=place) - exe.run(startup) - - train_exe = fluid.ParallelExecutor( - use_cuda=True, loss_name=train_args['loss'].name, main_program=train) - - fetch_var_list = [var.name for var in train_args['log']] - for i in xrange(sys.maxint): - result = map(numpy.array, - train_exe.run(fetch_list=fetch_var_list - if i % 1000 == 0 else [])) - if len(result) != 0: - print 'Train: ', result - - if i % 1000 == 0: - test_exe = fluid.ParallelExecutor( - use_cuda=True, main_program=test, share_vars_from=train_exe) - loss = [] - acc = [] - try: - while True: - loss_np, acc_np = map( - numpy.array, test_exe.run(fetch_list=fetch_var_list)) - loss.append(loss_np[0]) - acc.append(acc_np[0]) - except: - test_args['file'].reset() - print 'TEST: ', numpy.mean(loss), numpy.mean(acc) - - -if __name__ == '__main__': - main() diff --git a/python/paddle/fluid/tests/test_if_else_op.py b/python/paddle/fluid/tests/test_if_else_op.py index 1b58925599de62510ea9048f5210bb0b7e49f933..799c31dfe5161ff6aef47601f1b6f6e38885760b 100644 --- a/python/paddle/fluid/tests/test_if_else_op.py +++ b/python/paddle/fluid/tests/test_if_else_op.py @@ -19,6 +19,10 @@ from paddle.fluid.executor import Executor from paddle.fluid.optimizer import MomentumOptimizer import paddle.fluid.core as core import paddle.fluid as fluid +from paddle.fluid.layers.control_flow import split_lod_tensor +from paddle.fluid.layers.control_flow import merge_lod_tensor +from paddle.fluid.layers.control_flow import ConditionalBlock + import unittest import numpy as np @@ -34,11 +38,10 @@ class TestMNISTIfElseOp(unittest.TestCase): limit = layers.fill_constant(shape=[1], dtype='int64', value=5) cond = layers.less_than(x=label, y=limit) - true_image, false_image = layers.split_lod_tensor( - input=image, mask=cond) + true_image, false_image = split_lod_tensor(input=image, mask=cond) true_out = layers.create_tensor(dtype='float32') - true_cond = layers.ConditionalBlock([cond]) + true_cond = ConditionalBlock([cond]) with true_cond.block(): hidden = layers.fc(input=true_image, size=100, act='tanh') @@ -46,14 +49,14 @@ class TestMNISTIfElseOp(unittest.TestCase): layers.assign(input=prob, output=true_out) false_out = layers.create_tensor(dtype='float32') - false_cond = layers.ConditionalBlock([cond]) + false_cond = ConditionalBlock([cond]) with false_cond.block(): hidden = layers.fc(input=false_image, size=200, act='tanh') prob = layers.fc(input=hidden, size=10, act='softmax') layers.assign(input=prob, output=false_out) - prob = layers.merge_lod_tensor( + prob = merge_lod_tensor( in_true=true_out, in_false=false_out, mask=cond, x=image) loss = layers.cross_entropy(input=prob, label=label) avg_loss = layers.mean(loss) diff --git a/python/paddle/fluid/tests/unittests/test_conditional_block.py b/python/paddle/fluid/tests/unittests/test_conditional_block.py index 084b8d37386fac0366c190f5f30dd39467072498..d9f83905e6135e22f74e749857f9b0fbe464d3f4 100644 --- a/python/paddle/fluid/tests/unittests/test_conditional_block.py +++ b/python/paddle/fluid/tests/unittests/test_conditional_block.py @@ -18,14 +18,15 @@ import paddle.fluid.core as core from paddle.fluid.framework import default_startup_program, default_main_program from paddle.fluid.executor import Executor from paddle.fluid.backward import append_backward +from paddle.fluid.layers.control_flow import ConditionalBlock import numpy -class ConditionalBlock(unittest.TestCase): +class ConditionalBlockTest(unittest.TestCase): def test_forward(self): data = layers.data(name='X', shape=[1], dtype='float32') data.stop_gradient = False - cond = layers.ConditionalBlock(inputs=[data]) + cond = ConditionalBlock(inputs=[data]) out = layers.create_tensor(dtype='float32') with cond.block(): hidden = layers.fc(input=data, size=10) diff --git a/python/paddle/fluid/tests/unittests/test_const_value.py b/python/paddle/fluid/tests/unittests/test_const_value.py index d1075d514e9b2b692f271f10a005815a66b421fb..58ac6fa0a9a30a08a831111513777cca59062724 100644 --- a/python/paddle/fluid/tests/unittests/test_const_value.py +++ b/python/paddle/fluid/tests/unittests/test_const_value.py @@ -16,7 +16,7 @@ import unittest import paddle.fluid.framework as framework -class ConditionalBlock(unittest.TestCase): +class ConstantTest(unittest.TestCase): def test_const_value(self): self.assertEqual(framework.GRAD_VAR_SUFFIX, "@GRAD") self.assertEqual(framework.TEMP_VAR_NAME, "@TEMP@") diff --git a/python/paddle/fluid/tests/unittests/test_dyn_rnn.py b/python/paddle/fluid/tests/unittests/test_dyn_rnn.py index 0faed94deb4808783027d776e0f4c61da0db457a..4448de8839d7ad4ad1f70ecdc4ac94da1e619adb 100644 --- a/python/paddle/fluid/tests/unittests/test_dyn_rnn.py +++ b/python/paddle/fluid/tests/unittests/test_dyn_rnn.py @@ -17,6 +17,12 @@ import paddle import unittest import numpy +from paddle.fluid.layers.control_flow import lod_rank_table +from paddle.fluid.layers.control_flow import max_sequence_len +from paddle.fluid.layers.control_flow import lod_tensor_to_array +from paddle.fluid.layers.control_flow import array_to_lod_tensor +from paddle.fluid.layers.control_flow import shrink_memory + class TestDynRNN(unittest.TestCase): def setUp(self): @@ -38,12 +44,11 @@ class TestDynRNN(unittest.TestCase): label = fluid.layers.data(name='label', shape=[1], dtype='float32') - rank_table = fluid.layers.lod_rank_table(x=sent_emb) + rank_table = lod_rank_table(x=sent_emb) - sent_emb_array = fluid.layers.lod_tensor_to_array( - x=sent_emb, table=rank_table) + sent_emb_array = lod_tensor_to_array(x=sent_emb, table=rank_table) - seq_len = fluid.layers.max_sequence_len(rank_table=rank_table) + seq_len = max_sequence_len(rank_table=rank_table) i = fluid.layers.fill_constant(shape=[1], dtype='int64', value=0) i.stop_gradient = False @@ -66,7 +71,7 @@ class TestDynRNN(unittest.TestCase): mem = fluid.layers.array_read(array=mem_array, i=i) ipt = fluid.layers.array_read(array=sent_emb_array, i=i) - mem = fluid.layers.shrink_memory(x=mem, i=i, table=rank_table) + mem = shrink_memory(x=mem, i=i, table=rank_table) hidden = fluid.layers.fc(input=[mem, ipt], size=100, act='tanh') @@ -75,8 +80,7 @@ class TestDynRNN(unittest.TestCase): fluid.layers.array_write(x=hidden, i=i, array=mem_array) fluid.layers.less_than(x=i, y=seq_len, cond=cond) - all_timesteps = fluid.layers.array_to_lod_tensor( - x=out, table=rank_table) + all_timesteps = array_to_lod_tensor(x=out, table=rank_table) last = fluid.layers.sequence_last_step(input=all_timesteps) logits = fluid.layers.fc(input=last, size=1, act=None) loss = fluid.layers.sigmoid_cross_entropy_with_logits( diff --git a/python/paddle/fluid/tests/unittests/test_lod_rank_table.py b/python/paddle/fluid/tests/unittests/test_lod_rank_table.py index bac5e502318397b43e9867d5fc9e4e8cd33394b8..16e85830ffa51ec428951570cc7a038f3d10c873 100644 --- a/python/paddle/fluid/tests/unittests/test_lod_rank_table.py +++ b/python/paddle/fluid/tests/unittests/test_lod_rank_table.py @@ -12,7 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. -from paddle.fluid.layers import lod_rank_table, data +from paddle.fluid.layers import data +from paddle.fluid.layers.control_flow import lod_rank_table from paddle.fluid.executor import Executor import paddle.fluid.core as core import numpy diff --git a/python/paddle/fluid/tests/unittests/test_lod_tensor_array_ops.py b/python/paddle/fluid/tests/unittests/test_lod_tensor_array_ops.py index cebe6997bb4152519dabbabfc0404d6036bc4e65..5a4580116bc7009c73f1de14a265bf2cea5acf9b 100644 --- a/python/paddle/fluid/tests/unittests/test_lod_tensor_array_ops.py +++ b/python/paddle/fluid/tests/unittests/test_lod_tensor_array_ops.py @@ -20,6 +20,11 @@ from paddle.fluid.framework import Program, program_guard from paddle.fluid.executor import Executor from paddle.fluid.backward import append_backward +from paddle.fluid.layers.control_flow import lod_rank_table +from paddle.fluid.layers.control_flow import max_sequence_len +from paddle.fluid.layers.control_flow import lod_tensor_to_array +from paddle.fluid.layers.control_flow import array_to_lod_tensor + class TestCPULoDTensorArrayOps(unittest.TestCase): def place(self): @@ -137,13 +142,13 @@ class TestCPULoDTensorArrayOps(unittest.TestCase): with program_guard(program): x = layers.data(name='x', shape=[10]) x.persistable = True - table = layers.lod_rank_table(x, level=level) - max_len = layers.max_sequence_len(table) + table = lod_rank_table(x, level=level) + max_len = max_sequence_len(table) max_len.persistable = True - array = layers.lod_tensor_to_array(x, table) + array = lod_tensor_to_array(x, table) array.persistable = True - result = layers.array_to_lod_tensor(array, table) + result = array_to_lod_tensor(array, table) result.persistable = True exe = Executor(place) scope = core.Scope() @@ -181,9 +186,9 @@ class TestCPULoDTensorArrayOpGrad(unittest.TestCase): with program_guard(program): x = layers.data( name='x', shape=[1], dtype='float32', stop_gradient=False) - table = layers.lod_rank_table(x, level=0) - array = layers.lod_tensor_to_array(x, table) - result = layers.array_to_lod_tensor(array, table) + table = lod_rank_table(x, level=0) + array = lod_tensor_to_array(x, table) + result = array_to_lod_tensor(array, table) mean = layers.mean(result) diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py index b21e16439a5070e5f6d763e1617d4cfffe8bd618..76389d916fc39f470a22aed4792bf7b754600436 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py @@ -107,44 +107,24 @@ class TestMNIST(TestParallelExecutorBase): label = np.ones(shape=[32, 1], dtype='int64') return img, label - # simple_fc - def check_simple_fc_convergence(self, use_cuda, use_reduce=False): + def _compare_reduce_and_allreduce(self, model, use_cuda, random_data=True): if use_cuda and not core.is_compiled_with_cuda(): return - self.check_network_convergence(simple_fc_net, use_cuda=use_cuda) self.check_network_convergence( - simple_fc_net, use_cuda=use_cuda, allow_op_delay=True) - - img, label = self._init_data() - + model, use_cuda=use_cuda, use_reduce=True) self.check_network_convergence( - simple_fc_net, - feed_dict={"image": img, - "label": label}, - use_cuda=use_cuda, - use_reduce=use_reduce) + model, use_cuda=use_cuda, allow_op_delay=True, use_reduce=True) - def check_simple_fc_convergence_with_Reduce(self, use_cuda): - if use_cuda and not core.is_compiled_with_cuda(): - return - self.check_network_convergence( - simple_fc_net, use_cuda=use_cuda, use_reduce=True) - self.check_network_convergence( - simple_fc_net, - use_cuda=use_cuda, - allow_op_delay=True, - use_reduce=True) - - img, label = self._init_data() + img, label = self._init_data(random_data) all_reduce_first_loss, all_reduce_last_loss = self.check_network_convergence( - simple_fc_net, + model, feed_dict={"image": img, "label": label}, use_cuda=use_cuda, use_reduce=False) reduce_first_loss, reduce_last_loss = self.check_network_convergence( - simple_fc_net, + model, feed_dict={"image": img, "label": label}, use_cuda=use_cuda, @@ -153,7 +133,24 @@ class TestMNIST(TestParallelExecutorBase): for loss in zip(all_reduce_first_loss, reduce_first_loss): self.assertAlmostEquals(loss[0], loss[1], delta=1e-6) for loss in zip(all_reduce_last_loss, reduce_last_loss): - self.assertAlmostEquals(loss[0], loss[1], delta=1e-6) + self.assertAlmostEquals(loss[0], loss[1], delta=1e-4) + + # simple_fc + def check_simple_fc_convergence(self, use_cuda, use_reduce=False): + if use_cuda and not core.is_compiled_with_cuda(): + return + self.check_network_convergence(simple_fc_net, use_cuda=use_cuda) + self.check_network_convergence( + simple_fc_net, use_cuda=use_cuda, allow_op_delay=True) + + img, label = self._init_data() + + self.check_network_convergence( + simple_fc_net, + feed_dict={"image": img, + "label": label}, + use_cuda=use_cuda, + use_reduce=use_reduce) def test_simple_fc(self): # use_cuda @@ -162,8 +159,8 @@ class TestMNIST(TestParallelExecutorBase): def test_simple_fc_with_new_strategy(self): # use_cuda, use_reduce - self.check_simple_fc_convergence_with_Reduce(True) - self.check_simple_fc_convergence_with_Reduce(False) + self._compare_reduce_and_allreduce(simple_fc_net, True) + self._compare_reduce_and_allreduce(simple_fc_net, False) def check_simple_fc_parallel_accuracy(self, use_cuda): if use_cuda and not core.is_compiled_with_cuda(): @@ -209,39 +206,13 @@ class TestMNIST(TestParallelExecutorBase): "label": label}, use_cuda=use_cuda) - def check_batchnorm_fc_convergence_use_reduce(self, use_cuda): - if use_cuda and not core.is_compiled_with_cuda(): - return - self.check_network_convergence( - fc_with_batchnorm, use_cuda=use_cuda, use_reduce=True) - - img, label = self._init_data() - - all_reduce_first_loss, all_reduce_last_loss = self.check_network_convergence( - fc_with_batchnorm, - feed_dict={"image": img, - "label": label}, - use_cuda=use_cuda, - use_reduce=False) - reduce_first_loss, reduce_last_loss = self.check_network_convergence( - fc_with_batchnorm, - feed_dict={"image": img, - "label": label}, - use_cuda=use_cuda, - use_reduce=True) - - for loss in zip(all_reduce_first_loss, reduce_first_loss): - self.assertAlmostEquals(loss[0], loss[1], delta=1e-6) - for loss in zip(all_reduce_last_loss, reduce_last_loss): - self.assertAlmostEquals(loss[0], loss[1], delta=1e-4) - def test_batchnorm_fc(self): self.check_batchnorm_fc_convergence(True) self.check_batchnorm_fc_convergence(False) def test_batchnorm_fc_with_new_strategy(self): - self.check_batchnorm_fc_convergence_use_reduce(True) - self.check_batchnorm_fc_convergence_use_reduce(False) + self._compare_reduce_and_allreduce(fc_with_batchnorm, True) + self._compare_reduce_and_allreduce(fc_with_batchnorm, False) if __name__ == '__main__': diff --git a/python/paddle/fluid/tests/unittests/test_reorder_lod_tensor.py b/python/paddle/fluid/tests/unittests/test_reorder_lod_tensor.py index a70321bd800bf25eeb9e5d197ea7e08626b9aede..6e1cd56b3e309fc014dc981a1e3aa841159fca15 100644 --- a/python/paddle/fluid/tests/unittests/test_reorder_lod_tensor.py +++ b/python/paddle/fluid/tests/unittests/test_reorder_lod_tensor.py @@ -15,6 +15,7 @@ import unittest import paddle.fluid as fluid import paddle.fluid.core as core +from paddle.fluid.layers.control_flow import lod_rank_table import numpy @@ -34,7 +35,7 @@ class TestReorderLoDTensor(unittest.TestCase): dat.stop_gradient = False rank_dat = fluid.layers.data( name=cls.data_desc[1][0], shape=cls.data_desc[1][1]) - table = fluid.layers.lod_rank_table(rank_dat) + table = lod_rank_table(rank_dat) new_dat = fluid.layers.reorder_lod_tensor_by_rank( x=dat, rank_table=table) loss = fluid.layers.reduce_sum(new_dat) diff --git a/python/paddle/fluid/tests/unittests/test_shrink_rnn_memory.py b/python/paddle/fluid/tests/unittests/test_shrink_rnn_memory.py index 24bc2cbaf86e8ed2c6a359c4c4d9a1e1507df746..6f0e337034d1010880514181654170316fd9db19 100644 --- a/python/paddle/fluid/tests/unittests/test_shrink_rnn_memory.py +++ b/python/paddle/fluid/tests/unittests/test_shrink_rnn_memory.py @@ -21,6 +21,9 @@ from paddle.fluid.framework import default_main_program, switch_main_program from paddle.fluid.framework import Program import numpy as np +from paddle.fluid.layers.control_flow import shrink_memory +from paddle.fluid.layers.control_flow import lod_rank_table + class TestShrinkRNNMemoryBase(unittest.TestCase): def setUp(self): @@ -30,15 +33,15 @@ class TestShrinkRNNMemoryBase(unittest.TestCase): x.stop_gradient = False rank_table_tensor = layers.data( 'rank_table_tensor', shape=[1], dtype='float32', lod_level=1) - table = layers.lod_rank_table(x=rank_table_tensor) + table = lod_rank_table(x=rank_table_tensor) i = layers.zeros(dtype='int64', shape=[1]) - self.mem1 = layers.shrink_memory(x=x, i=i, table=table) + self.mem1 = shrink_memory(x=x, i=i, table=table) i = layers.increment(x=i) i.stop_gradient = True - self.mem2 = layers.shrink_memory(x=self.mem1, i=i, table=table) + self.mem2 = shrink_memory(x=self.mem1, i=i, table=table) i = layers.increment(x=i) i.stop_gradient = True - self.mem3 = layers.shrink_memory(x=self.mem2, i=i, table=table) + self.mem3 = shrink_memory(x=self.mem2, i=i, table=table) mem3_mean = layers.mean(self.mem3) append_backward(loss=mem3_mean) self.x_grad = self.main_program.global_block().var('x@GRAD') diff --git a/python/paddle/fluid/tests/unittests/test_split_and_merge_lod_tensor_op.py b/python/paddle/fluid/tests/unittests/test_split_and_merge_lod_tensor_op.py index 0916ed7c9f1e2d6d90c6908983fdc8b177aecbb9..ea1146166d34a31efbd859318b411cea50895fe1 100644 --- a/python/paddle/fluid/tests/unittests/test_split_and_merge_lod_tensor_op.py +++ b/python/paddle/fluid/tests/unittests/test_split_and_merge_lod_tensor_op.py @@ -19,6 +19,8 @@ import paddle.fluid.layers as layers from paddle.fluid.framework import Program, program_guard from paddle.fluid.executor import Executor from paddle.fluid.backward import append_backward +from paddle.fluid.layers.control_flow import split_lod_tensor +from paddle.fluid.layers.control_flow import merge_lod_tensor class TestCPULoDTensorArrayOps(unittest.TestCase): @@ -96,12 +98,11 @@ class TestCPULoDTensorArrayOps(unittest.TestCase): y = layers.data(name='y', shape=[1]) y.persistable = True - out_true, out_false = layers.split_lod_tensor( - input=x, mask=y, level=level) + out_true, out_false = split_lod_tensor(input=x, mask=y, level=level) out_true.persistable = True out_false.persistable = True - out = layers.merge_lod_tensor( + out = merge_lod_tensor( in_true=out_true, in_false=out_false, mask=y, x=x, level=level) out.persistable = True @@ -142,9 +143,8 @@ class TestCPUSplitMergeLoDTensorGrad(unittest.TestCase): level = 0 - out_true, out_false = layers.split_lod_tensor( - input=x, mask=y, level=level) - out = layers.merge_lod_tensor( + out_true, out_false = split_lod_tensor(input=x, mask=y, level=level) + out = merge_lod_tensor( in_true=out_true, in_false=out_false, mask=y, x=x, level=level) mean = layers.mean(out) diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index fc58703eca73addca109506aa60c0099ff31e1b5..e7698d8c52b411fd0afe919625034107081726b5 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -887,7 +887,8 @@ class DistributeTranspiler(object): # create table optimize block in pserver program table_opt_op = [ op for op in self.optimize_ops - if op.input("Param")[0] == self.table_name + if 'Param' in op.input_names and op.input("Param")[0] == + self.table_name ][0] table_opt_block = pserver_program.create_block(pre_block_idx) # only support sgd now