diff --git a/paddle/fluid/framework/details/parallel_ssa_graph_executor.cc b/paddle/fluid/framework/details/parallel_ssa_graph_executor.cc index 2377f2c963d25bf52c66cd8c152fefeff48271ea..bb1f415128e58cc166a78e069fbf3a84c60951e6 100644 --- a/paddle/fluid/framework/details/parallel_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/parallel_ssa_graph_executor.cc @@ -28,7 +28,13 @@ ParallelSSAGraphExecutor::ParallelSSAGraphExecutor( places_(std::move(places)), graphs_(std::move(graphs)) { PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size()); - // do not use threadpool for each graph execution. + + // set the correct size of thread pool to each device. + strategy_.num_threads_ = strategy_.num_threads_ < places_.size() + ? 1UL + : strategy_.num_threads_ / places_.size(); + VLOG(1) << "set num_threads: " << strategy_.num_threads_ + << " to schedule operators on each device."; for (size_t i = 0; i < places.size(); ++i) { executors_.emplace_back(new details::ThreadedSSAGraphExecutor( strategy_, {local_scopes_[i]}, {places_[i]}, std::move(graphs_[i]))); diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 934cf34cbd49e457f8c37602c75502342350aab1..176c1db349c09bea8974db8428c2c7f905932ba6 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -21,10 +21,6 @@ limitations under the License. */ #include "paddle/fluid/framework/ir/graph.h" -#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) -#include "paddle/fluid/platform/nccl_helper.h" -#endif - #include "paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h" #include "paddle/fluid/framework/details/multi_devices_helper.h" #include "paddle/fluid/framework/details/parallel_ssa_graph_executor.h" @@ -39,6 +35,8 @@ limitations under the License. */ DEFINE_string(pe_profile_fname, "", "Profiler filename for PE, which generated by gperftools." "Only valid when compiled `WITH_PRIFILER=ON`. Empty if disable."); +DEFINE_bool(enable_parallel_graph, true, + "Force disable parallel graph execution mode if set false."); namespace paddle { namespace framework { @@ -211,15 +209,6 @@ ParallelExecutor::ParallelExecutor( "the number of places must be greater than 1."); } - // FIXME(Yancey1989): parallel graph mode get better performance - // in GPU allreduce distributed training. Need an elegant way to - // choice the execution strategy. - build_strategy.enable_parallel_graph_ = - EnableParallelGraphExecution(main_program, exec_strategy, build_strategy); - - VLOG(1) << "Enable ParallelGraph Execution: " - << build_strategy.enable_parallel_graph_; - // Step 1. Bcast the bcast_vars to devs. // Create local scopes if (local_scopes.empty()) { @@ -236,24 +225,35 @@ ParallelExecutor::ParallelExecutor( } } + // FIXME(Yancey1989): parallel graph mode get better performance + // in GPU allreduce distributed training. Need an elegant way to + // choice the execution strategy. + build_strategy.enable_parallel_graph_ = + EnableParallelGraphExecution(main_program, exec_strategy, build_strategy); + + VLOG(1) << "Enable ParallelGraph Execution: " + << build_strategy.enable_parallel_graph_; + if (member_->use_cuda_) { // Bcast Parameters to all GPUs #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) + ncclUniqueId *nccl_id = nullptr; + // gen_nccl_id operator can broadcast the ncclUniqueId for nccl2 collective + // distributed training auto *nccl_id_var = scope->FindVar(NCCL_ID_VARNAME); - std::unique_ptr nccl_id; - // nccl collective would broadcast ncclUniqueId by gen_nccl_id operator. if (nccl_id_var != nullptr) { - nccl_id.reset(nccl_id_var->GetMutable()); + nccl_id = nccl_id_var->GetMutable(); } if (build_strategy.enable_parallel_graph_ && member_->nranks_ > 1UL) { - if (nccl_id.get() == nullptr) { - nccl_id.reset(new ncclUniqueId()); - platform::dynload::ncclGetUniqueId(nccl_id.get()); + if (nccl_id == nullptr) { + local_nccl_id_.reset(new ncclUniqueId()); + platform::dynload::ncclGetUniqueId(local_nccl_id_.get()); + nccl_id = local_nccl_id_.get(); } } member_->nccl_ctxs_.reset(new platform::NCCLContextMap( - member_->places_, nccl_id.get(), num_trainers, trainer_id)); + member_->places_, nccl_id, num_trainers, trainer_id)); #else PADDLE_THROW("Not compiled with CUDA"); #endif @@ -492,7 +492,7 @@ bool ParallelExecutor::EnableParallelGraphExecution( if (build_strategy.enable_sequential_execution_ || exec_strategy.type_ == ExecutionStrategy::ExecutorType::kExperimental) enable_parallel_graph = false; - return enable_parallel_graph; + return enable_parallel_graph && FLAGS_enable_parallel_graph; } ParallelExecutor::~ParallelExecutor() { diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index dc70894dbdb119fd7d5fe7a8581bb3b5a09fd6cc..49d3f0d3f6f2a8965d39b656071d86bde42bfd93 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -28,6 +28,10 @@ limitations under the License. */ #include "paddle/fluid/framework/tensor.h" #include "paddle/fluid/platform/device_context.h" +#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) +#include "paddle/fluid/platform/nccl_helper.h" +#endif + namespace paddle { namespace framework { @@ -73,6 +77,9 @@ class ParallelExecutor { const BuildStrategy &build_strategy) const; ParallelExecutorPrivate *member_; +#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) + std::unique_ptr local_nccl_id_; +#endif }; } // namespace framework diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index d664107d57091946a09aabf659867d5ab3180cfd..1473603a7473948565b313333be64feea78d9544 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -810,7 +810,7 @@ All parameter, weight, gradient are variables in Paddle. If :math:`num\_threads=1`, all the operators will execute one by one, but the order maybe difference between iterations. If it is not set, it will be set in ParallelExecutor according to the - device type and device count, for GPU, :math:`num\_threads=device\_count*4`, for CPU, + device type and device count, for GPU, :math:`num\_threads=device\_count`, for CPU, :math:`num\_threads=CPU\_NUM*4`, the explanation of:math:`CPU\_NUM` is in ParallelExecutor. if it is not set, ParallelExecutor will get the cpu count by calling `multiprocessing.cpu_count()`. Default 0.)DOC") diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index c97a93ec36d4f4a7ff6a9f097551e2d21022d5b1..970996128686a70ae7204ec3eba4583fc8c02cd3 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -117,7 +117,7 @@ class ParallelExecutor(object): if use_cuda: # Experiments on se-resnext shows that too many threads hurt # performance. Worth tunning for other models in the future. - exec_strategy.num_threads = len(self._places) * 4 + exec_strategy.num_threads = len(self._places) else: cpu_num = int( os.environ.get('CPU_NUM', multiprocessing.cpu_count()))