diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 3e1d61813ca83ebdf9435036117e79abe501b24b..acc71396b441149988f654843482a9e292977de3 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -181,13 +181,14 @@ std::vector &ParallelExecutor::GetLocalScopes() { return member_->local_scopes_; } -ParallelExecutor::ParallelExecutor( - const std::vector &places, - const std::unordered_set &bcast_vars, - const std::string &loss_var_name, Scope *scope, - const std::vector &local_scopes, - const ExecutionStrategy &exec_strategy, const BuildStrategy &build_strategy, - ir::Graph *graph) +ParallelExecutor::ParallelExecutor(const std::vector &places, + const std::vector &bcast_vars, + const std::string &loss_var_name, + Scope *scope, + const std::vector &local_scopes, + const ExecutionStrategy &exec_strategy, + const BuildStrategy &build_strategy, + ir::Graph *graph) : member_(new ParallelExecutorPrivate(places)) { member_->global_scope_ = scope; member_->use_cuda_ = exec_strategy.use_cuda_; @@ -254,9 +255,23 @@ ParallelExecutor::ParallelExecutor( PADDLE_THROW("Not compiled with CUDA"); #endif } - if (member_->local_scopes_.size() != 1 && local_scopes.empty()) { - BCastParamsToDevices(bcast_vars); + // broadcast parameters from the 0th device to others: + auto need_broadcast = [&]() -> bool { + if (build_strategy.num_trainers_ > 1) { + // 1. num_tariners would be grater than 1 for nccl distributed training. + return true; + } else if (member_->local_scopes_.size() != 1 && local_scopes.empty()) { + // 2. Only one trainer process, but ParallelExecutor hold multiple + // devices. + return true; + } + return false; + }; + + if (need_broadcast()) { + BCastParamsToDevices(bcast_vars, build_strategy.trainer_id_); } + // Startup Program has been run. All local scopes has correct parameters. // Step 2. Convert main_program to SSA form and dependency graph. Also, insert @@ -338,7 +353,7 @@ ParallelExecutor::ParallelExecutor( } void ParallelExecutor::BCastParamsToDevices( - const std::unordered_set &vars) const { + const std::vector &vars, int trainer_id) const { // the initializing bcast, all vars would be bcast from device(0). for (auto &var : vars) { framework::Variable *main_var = member_->local_scopes_[0]->FindVar(var); @@ -362,7 +377,7 @@ void ParallelExecutor::BCastParamsToDevices( auto place = member_->places_[i]; void *buffer; - if (i == 0) { + if (i == 0 && trainer_id == 0) { buffer = const_cast(main_tensor.data()); } else { auto local_scope = member_->local_scopes_[i]; diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index ddf60b39466e72822142e1dad2cfe9a97b6cf6f2..d4658b9623fe8c23b6a8b2903e3c48d794ba1652 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -14,9 +14,11 @@ limitations under the License. */ #pragma once +#include #include #include #include +#include #include #include "paddle/fluid/framework/details/build_strategy.h" @@ -45,7 +47,7 @@ class ParallelExecutor { public: explicit ParallelExecutor(const std::vector &places, - const std::unordered_set &bcast_vars, + const std::vector &bcast_vars, const std::string &loss_var_name, Scope *scope, const std::vector &local_scopes, const ExecutionStrategy &exec_strategy, @@ -70,7 +72,10 @@ class ParallelExecutor { const std::string &fetched_var_name); private: - void BCastParamsToDevices(const std::unordered_set &vars) const; + // broadcast the parameters from the 0th device. + // trainer_id the trainer index in nccl distributed training. + void BCastParamsToDevices(const std::vector &vars, + int trainer_id = 0) const; bool EnableParallelGraphExecution(const ir::Graph &graph, const ExecutionStrategy &exec_strategy, const BuildStrategy &build_strategy) const; diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index fd061d25920728a02f9504300429812160aa0461..552a5e0c3289b022041c6ea4f26694ed24aa858d 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -1251,7 +1251,7 @@ All parameter, weight, gradient are variables in Paddle. cannot be updated after being finalized.)DOC"); pe.def(py::init &, - const std::unordered_set &, const std::string &, + const std::vector &, const std::string &, Scope *, std::vector &, const ExecutionStrategy &, const BuildStrategy &, ir::Graph *>()) // NOTE: even we return a vec* to Python use reference policy. diff --git a/python/paddle/fluid/compiler.py b/python/paddle/fluid/compiler.py index 8f60f6f8b54f799b2495f92ac5b1914ed68387f7..2bf30e39d35e53cc3c61ec538d0d65ec1315b0f0 100644 --- a/python/paddle/fluid/compiler.py +++ b/python/paddle/fluid/compiler.py @@ -230,13 +230,17 @@ class CompiledProgram(object): self._persistable_vars.append(cpt.to_text(node.name())) places = list(map(_place_obj, self._places)) - - return core.ParallelExecutor(places, - set(self._persistable_vars), - cpt.to_text(self._loss_name) - if self._loss_name else six.u(''), scope, - self._local_scopes, self._exec_strategy, - self._build_strategy, self._graph) + # ParallelExecutor would broadcast all the parameters during initializing. + # The parameters of each process should be in the same ordered for the data-parallelism + # distributed training to keep the broadcast correct. + self._persistable_vars = list(set(self._persistable_vars)) + self._persistable_vars.sort() + + return core.ParallelExecutor( + places, self._persistable_vars, + cpt.to_text(self._loss_name) + if self._loss_name else six.u(''), self._scope, self._local_scopes, + self._exec_strategy, self._build_strategy, self._graph) def _compile_inference(self): return core.create_paddle_predictor(self._infer_config)