diff --git a/benchmark/fluid/args.py b/benchmark/fluid/args.py index ed696e82f8723eba573e8affd3f25e2aa6426e63..0d5c9652de6b814627e54018366137e214726619 100644 --- a/benchmark/fluid/args.py +++ b/benchmark/fluid/args.py @@ -140,5 +140,11 @@ def parse_args(): '--use_lars', action='store_true', help='If set, use lars for optimizers, ONLY support resnet module.') + parser.add_argument( + '--reduce_strategy', + type=str, + choices=['reduce', 'all_reduce'], + default='all_reduce', + help='Specify the reduce strategy, can be reduce, all_reduce') args = parser.parse_args() return args diff --git a/benchmark/fluid/fluid_benchmark.py b/benchmark/fluid/fluid_benchmark.py index 25622ee06c69e13181f34dfffadd5e299d31c8a8..ddd9fe809853a830ca676cc98f1819f683866def 100644 --- a/benchmark/fluid/fluid_benchmark.py +++ b/benchmark/fluid/fluid_benchmark.py @@ -170,6 +170,14 @@ def train_parallel(train_args, test_args, args, train_prog, test_prog, strategy = fluid.ExecutionStrategy() strategy.num_threads = args.cpus strategy.allow_op_delay = False + build_strategy = fluid.BuildStrategy() + if args.reduce_strategy == "reduce": + build_strategy.reduce_strategy = fluid.BuildStrategy( + ).ReduceStrategy.Reduce + else: + build_strategy.reduce_strategy = fluid.BuildStrategy( + ).ReduceStrategy.AllReduce + avg_loss = train_args[0] if args.update_method == "pserver": @@ -184,6 +192,7 @@ def train_parallel(train_args, test_args, args, train_prog, test_prog, avg_loss.name, main_program=train_prog, exec_strategy=strategy, + build_strategy=build_strategy, num_trainers=num_trainers, trainer_id=trainer_id) diff --git a/benchmark/fluid/models/mnist.py b/benchmark/fluid/models/mnist.py index cef8657ee629dcbc19221fd3440844a56627e920..f123e07fb711bd8ff67c1ecf5ec9a02c1e79eb1d 100644 --- a/benchmark/fluid/models/mnist.py +++ b/benchmark/fluid/models/mnist.py @@ -67,11 +67,14 @@ def cnn_model(data): def get_model(args, is_train, main_prog, startup_prog): # NOTE: mnist is small, we don't implement data sharding yet. - filelist = [ - os.path.join(args.data_path, f) for f in os.listdir(args.data_path) - ] + opt = None + data_file_handle = None with fluid.program_guard(main_prog, startup_prog): if args.use_reader_op: + filelist = [ + os.path.join(args.data_path, f) + for f in os.listdir(args.data_path) + ] data_file_handle = fluid.layers.open_files( filenames=filelist, shapes=[[-1, 1, 28, 28], (-1, 1)], @@ -100,7 +103,7 @@ def get_model(args, is_train, main_prog, startup_prog): if is_train: opt = fluid.optimizer.AdamOptimizer( learning_rate=0.001, beta1=0.9, beta2=0.999) - opt.minimize() + opt.minimize(avg_cost) if args.memory_optimize: fluid.memory_optimize(main_prog) diff --git a/paddle/fluid/framework/details/all_reduce_op_handle.cc b/paddle/fluid/framework/details/all_reduce_op_handle.cc index bf493a3fa44e48deec734250d04b2a413c3ed9da..7c5f5bd80a937bf1a1c891155764833d7b21c5c2 100644 --- a/paddle/fluid/framework/details/all_reduce_op_handle.cc +++ b/paddle/fluid/framework/details/all_reduce_op_handle.cc @@ -46,7 +46,8 @@ AllReduceOpHandle::AllReduceOpHandle(ir::Node *node, #endif void AllReduceOpHandle::RunImpl() { - platform::RecordEvent r("all_reduce", nullptr); + platform::RecordEvent record_event(Name(), dev_ctxes_.begin()->second); + if (NoDummyInputSize() == 1) { return; // No need to all reduce when GPU count = 1; } else { diff --git a/paddle/fluid/framework/details/broadcast_op_handle.cc b/paddle/fluid/framework/details/broadcast_op_handle.cc index 1d9f1bd6e417e30f0799f0bbed1739cedb4e8fbf..4fdab5cd94358d08eac7f8b041bf16d09042f0bd 100644 --- a/paddle/fluid/framework/details/broadcast_op_handle.cc +++ b/paddle/fluid/framework/details/broadcast_op_handle.cc @@ -15,12 +15,15 @@ #include "paddle/fluid/framework/details/broadcast_op_handle.h" #include "paddle/fluid/framework/details/container_cast.h" #include "paddle/fluid/framework/details/variable_visitor.h" +#include "paddle/fluid/platform/profiler.h" namespace paddle { namespace framework { namespace details { void BroadcastOpHandle::RunImpl() { + platform::RecordEvent record_event(Name(), dev_ctxes_.begin()->second); + if (places_.size() == 1) return; // The input and output may have dummy vars. diff --git a/paddle/fluid/framework/details/multi_devices_graph_pass.cc b/paddle/fluid/framework/details/multi_devices_graph_pass.cc index d44ebbae4d4be9c79e629303805c94030b8879db..250e093a5f789dba6b06df4889c060c294d469fe 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_pass.cc +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.cc @@ -348,14 +348,31 @@ std::unique_ptr MultiDevSSAGraphBuilder::ApplyImpl( size_t cur_device_id = 0; bool is_forwarding = true; + bool is_dist_train = false; for (ir::Node *node : sorted_ops) { if (boost::get( node->Op()->GetAttr(OpProtoAndCheckerMaker::OpRoleAttrName())) == static_cast(OpRole::kRPC)) { - CreateRPCOp(&result, node); + int op_dev_id = CreateRPCOp(&result, node); + PADDLE_ENFORCE(op_dev_id != -1, + "Can not schedule the RPC operator to the right place."); + if (node->Op()->Type() == "recv") { + auto recv_vars_attr = + boost::get>(node->Op()->GetNullableAttr( + OpProtoAndCheckerMaker::OpRoleVarAttrName())); + PADDLE_ENFORCE(recv_vars_attr.size() == 2UL); // [parameter, gradient] + if (recv_vars_attr[0].find(".block") == std::string::npos) { + bcast_var_name_set[op_dev_id].emplace(recv_vars_attr[0]); + } + } + is_dist_train = true; } else if (IsDistTrainOp(node, send_vars, recv_vars)) { - CreateDistTrainOp(&result, node); + int op_dev_id = CreateDistTrainOp(&result, node); + if (node->Op()->Type() == "concat") { + auto origin_param_name = node->Op()->OutputArgumentNames()[0]; + bcast_var_name_set[op_dev_id].emplace(origin_param_name); + } } else if (IsScaleLossOp(node)) { // user can customize loss@grad if not use_default_grad_scale_ if (strategy_.gradient_scale_ != @@ -414,7 +431,9 @@ std::unique_ptr MultiDevSSAGraphBuilder::ApplyImpl( CreateReduceOp(&result, g_name, cur_device_id); graph->Get(kShardedVarDevice) .emplace(g_name, cur_device_id); - bcast_var_name_set[cur_device_id].emplace(p_name); + if (!is_dist_train) { + bcast_var_name_set[cur_device_id].emplace(p_name); + } break; case BuildStrategy::ReduceStrategy::kAllReduce: if (IsSparseGradient(g_name)) { @@ -436,14 +455,19 @@ std::unique_ptr MultiDevSSAGraphBuilder::ApplyImpl( } } } - bool use_gpu = false; #ifdef PADDLE_WITH_CUDA use_gpu = nccl_ctxs_ != nullptr; #endif - if (use_gpu && strategy_.reduce_ == BuildStrategy::ReduceStrategy::kReduce) { - // Insert BCast Ops + // Insert broadcast operators principle: + // 1. Broadcast optimized parameters in Reduce strategy; + // 2. No need broadcast optimized parameters in AllReduce strategy because of + // the optimization sub-graph would be run on every GPU; + // 3. Allways broadcast received parameters in Distribute Training. + if ((use_gpu && + strategy_.reduce_ == BuildStrategy::ReduceStrategy::kReduce) || + is_dist_train) { for (size_t dev_id = 0; dev_id < bcast_var_name_set.size(); ++dev_id) { auto &to_bcast_set = bcast_var_name_set[dev_id]; for (auto &bcast_name : to_bcast_set) { @@ -675,8 +699,8 @@ VarHandle *MultiDevSSAGraphBuilder::CreateReduceOp(ir::Graph *result, return var; } -void MultiDevSSAGraphBuilder::CreateDistTrainOp(ir::Graph *result, - ir::Node *node) const { +int MultiDevSSAGraphBuilder::CreateDistTrainOp(ir::Graph *result, + ir::Node *node) const { int op_dev_id = -1; std::vector input_var_names; std::vector output_var_names; @@ -719,6 +743,7 @@ void MultiDevSSAGraphBuilder::CreateDistTrainOp(ir::Graph *result, node->Op()->Type()); CreateComputationalOp(result, node, op_dev_id); + return op_dev_id; } void SetOpInputsAllPlaces(ir::Graph *result, ir::Node *node, int num_places) { @@ -737,8 +762,8 @@ void SetOpInputsAllPlaces(ir::Graph *result, ir::Node *node, int num_places) { } // Create RPC related op handles that connects its in ops and out ops. -void MultiDevSSAGraphBuilder::CreateRPCOp(ir::Graph *result, - ir::Node *node) const { +int MultiDevSSAGraphBuilder::CreateRPCOp(ir::Graph *result, + ir::Node *node) const { int op_dev_id = -1; if (node->Op()->Type() == "send") { // TODO(paddle-dev): getting the first var is not safe. @@ -824,6 +849,7 @@ void MultiDevSSAGraphBuilder::CreateRPCOp(ir::Graph *result, CreateOpOutput(result, op_handle, new_node, p, outvar_dev_id); } } + return op_dev_id; } bool MultiDevSSAGraphBuilder::IsScaleLossOp(ir::Node *node) const { diff --git a/paddle/fluid/framework/details/multi_devices_graph_pass.h b/paddle/fluid/framework/details/multi_devices_graph_pass.h index ac6d9c5a64cfde60f75c76dae0a30cc7d735e996..1ca8c4b855f9468589e537245380451a91a50b14 100644 --- a/paddle/fluid/framework/details/multi_devices_graph_pass.h +++ b/paddle/fluid/framework/details/multi_devices_graph_pass.h @@ -54,8 +54,8 @@ class MultiDevSSAGraphBuilder : public ir::Pass { bool IsScaleLossOp(ir::Node *node) const; - void CreateRPCOp(ir::Graph *result, ir::Node *node) const; - void CreateDistTrainOp(ir::Graph *result, ir::Node *node) const; + int CreateRPCOp(ir::Graph *result, ir::Node *node) const; + int CreateDistTrainOp(ir::Graph *result, ir::Node *node) const; /** * Is this operator as the end-point operator before/after send operator. diff --git a/paddle/fluid/framework/details/reduce_op_handle.cc b/paddle/fluid/framework/details/reduce_op_handle.cc index 6c7e5c1fb06620b1c071b00fcfcc1b4a29bf8d62..7fc06f234d42a992328c0b6164f17945d8075c28 100644 --- a/paddle/fluid/framework/details/reduce_op_handle.cc +++ b/paddle/fluid/framework/details/reduce_op_handle.cc @@ -27,7 +27,8 @@ namespace framework { namespace details { void ReduceOpHandle::RunImpl() { - platform::RecordEvent r("reduce", nullptr); + platform::RecordEvent record_event(Name(), dev_ctxes_.begin()->second); + if (places_.size() == 1) return; // the input and output may have dummy var. auto in_var_handles = DynamicCast(inputs_); diff --git a/paddle/fluid/framework/details/scale_loss_grad_op_handle.cc b/paddle/fluid/framework/details/scale_loss_grad_op_handle.cc index 609e18581957f62b040e04e937873b7a8fa5785a..ba243979b34aa1f683de707525403becaf0a1c00 100644 --- a/paddle/fluid/framework/details/scale_loss_grad_op_handle.cc +++ b/paddle/fluid/framework/details/scale_loss_grad_op_handle.cc @@ -51,7 +51,7 @@ void ScaleLossGradOpHandle::RunImpl() { ->stream(); memory::Copy(boost::get(place_), tmp, platform::CPUPlace(), &coeff_, sizeof(float), stream); - VLOG(1) << place_ << "RUN Scale loss grad op"; + VLOG(10) << place_ << "RUN Scale loss grad op"; }); #endif } diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 20fc08e21dadc12be8903476df374abb5caecf61..8bc30fc123163983f4bddc19af489920db93e0c0 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -683,7 +683,6 @@ All parameter, weight, gradient are variables in Paddle. const std::string &, Scope *, std::vector &, const ExecutionStrategy &, const BuildStrategy &, size_t, size_t>()) - .def("_bcast_params", &ParallelExecutor::BCastParamsToDevices) // NOTE: even we return a vec* to Python use reference policy. // We still cannot get local_scope from this vector, since the element // of vec will be freed by Python GC. We can only return Scope* diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index bd9f8b3c356ca1e43923d42beebaa5ab98158084..44af29d3390e35129d0ee65b31eacad6b28a9d60 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -142,11 +142,6 @@ class ParallelExecutor(object): main = main if main else framework.default_main_program() if scope == None: scope = executor.global_scope() - # FIXME(Yancey1989): it's a temporary approach to determinate the distribute - # train program, call self.bcast_param() at the end of each mini-batch. - self.is_dist = True if "recv" in [ - op.type for op in main.global_block().ops - ] else False if share_vars_from and not isinstance(share_vars_from, ParallelExecutor): @@ -286,21 +281,11 @@ class ParallelExecutor(object): self.executor.run(fetch_list, fetch_var_name) arr = self.scope.find_var(fetch_var_name).get_lod_tensor_array() - if self.is_dist: - self._bcast_params() - if return_numpy: return executor.as_numpy(arr) return [arr[i] for i in range(len(arr))] - def _bcast_params(self): - """ - Broadcast the parameters to other devices. It is used during - distributed training. - """ - self.executor._bcast_params(set(self.persistable_vars)) - @property def device_count(self): return len(self._act_places)