From 65bbf950eef104a812b361a89d18b4f7aaab894c Mon Sep 17 00:00:00 2001 From: gongweibao Date: Mon, 27 May 2019 14:09:06 +0800 Subject: [PATCH] Add multi-ncclcomm and 2D ncclallreduce support. (#17263) --- .../framework/details/all_reduce_op_handle.cc | 31 +-- .../framework/details/all_reduce_op_handle.h | 17 +- .../fluid/framework/details/build_strategy.cc | 33 ++- .../fluid/framework/details/build_strategy.h | 13 +- .../fast_threaded_ssa_graph_executor.cc | 1 + .../details/fused_all_reduce_op_handle.cc | 22 +- .../details/fused_all_reduce_op_handle.h | 13 +- .../framework/details/multi_devices_helper.h | 1 + .../fluid/framework/details/nccl_op_handle.h | 234 ++++++++++++++++++ .../fluid/framework/details/op_handle_base.cc | 5 + .../details/parallel_ssa_graph_executor.cc | 1 + .../details/sparse_all_reduce_op_handle.cc | 5 +- .../details/sparse_all_reduce_op_handle.h | 2 +- .../all_reduce_deps_pass.cc | 40 ++- .../fuse_all_reduce_op_pass.cc | 25 +- .../multi_devices_graph_pass.cc | 12 +- .../multi_devices_graph_pass.h | 3 +- paddle/fluid/framework/parallel_executor.cc | 132 +++++++--- paddle/fluid/framework/parallel_executor.h | 3 - .../distributed_ops/gen_nccl_id_op.cc | 192 ++++++++++++-- paddle/fluid/platform/nccl_helper.h | 132 +++++++++- paddle/fluid/pybind/pybind.cc | 28 +++ python/paddle/fluid/compiler.py | 8 + python/paddle/fluid/framework.py | 4 + .../fluid/tests/unittests/test_dist_base.py | 14 +- .../fluid/tests/unittests/test_dist_mnist.py | 14 ++ .../fluid/transpiler/distribute_transpiler.py | 43 +++- 27 files changed, 862 insertions(+), 166 deletions(-) create mode 100644 paddle/fluid/framework/details/nccl_op_handle.h diff --git a/paddle/fluid/framework/details/all_reduce_op_handle.cc b/paddle/fluid/framework/details/all_reduce_op_handle.cc index c9f06c64e..f1bd26e26 100644 --- a/paddle/fluid/framework/details/all_reduce_op_handle.cc +++ b/paddle/fluid/framework/details/all_reduce_op_handle.cc @@ -35,16 +35,9 @@ namespace details { AllReduceOpHandle::AllReduceOpHandle(ir::Node *node, const std::vector &local_scopes, const std::vector &places, - const platform::NCCLContextMap *ctxs) - : OpHandleBase(node), - local_scopes_(local_scopes), - places_(places), - nccl_ctxs_(ctxs) { - if (nccl_ctxs_) { - for (auto &p : places_) { - this->SetDeviceContext(p, nccl_ctxs_->DevCtx(p)); - } - } + const platform::MultiNCCLContextMap *ctxs) + : NCCLOpHandleBase(node, places, ctxs), local_scopes_(local_scopes) { + PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size()); } #else AllReduceOpHandle::AllReduceOpHandle(ir::Node *node, @@ -71,7 +64,9 @@ void AllReduceOpHandle::RunAllReduceFuncs( if (FLAGS_sync_nccl_allreduce) { for (auto &p : places_) { int dev_id = boost::get(p).device; - auto &nccl_ctx = nccl_ctxs_->at(dev_id); + auto *nccl_ctxs = + nccl_ctxs_->GetRunEnvNCCLCtx(run_order_, use_hierarchical_allreduce_); + auto &nccl_ctx = nccl_ctxs->at(dev_id); auto stream = nccl_ctx.stream(); cudaError_t e_sync = cudaStreamSynchronize(stream); if (e_sync != 0) { @@ -134,19 +129,9 @@ void AllReduceOpHandle::RunImpl() { numel = static_cast(lod_tensor.numel()); } - int dev_id = boost::get(p).device; - auto &nccl_ctx = nccl_ctxs_->at(dev_id); - auto stream = nccl_ctx.stream(); - auto comm = nccl_ctx.comm_; - - VLOG(10) << "before all reduce buffer:" << buffer << ", numel:" << numel - << ", dev_id:" << dev_id << ", dtype:" << dtype - << ", place:" << p; - all_reduce_calls.emplace_back([=] { - PADDLE_ENFORCE(platform::dynload::ncclAllReduce( - buffer, buffer, numel, static_cast(dtype), ncclSum, - comm, stream)); + NCCLAllReduce(p, buffer, buffer, numel, + static_cast(dtype), ncclSum); }); } RunAllReduceFuncs(all_reduce_calls); diff --git a/paddle/fluid/framework/details/all_reduce_op_handle.h b/paddle/fluid/framework/details/all_reduce_op_handle.h index 3effd0a85..5ccf4291d 100644 --- a/paddle/fluid/framework/details/all_reduce_op_handle.h +++ b/paddle/fluid/framework/details/all_reduce_op_handle.h @@ -21,6 +21,7 @@ #include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/scope.h" #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) +#include "paddle/fluid/framework/details/nccl_op_handle.h" #include "paddle/fluid/platform/nccl_helper.h" #endif @@ -28,13 +29,15 @@ namespace paddle { namespace framework { namespace details { -class AllReduceOpHandle : public OpHandleBase { - public: #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) +class AllReduceOpHandle : public NCCLOpHandleBase { + public: AllReduceOpHandle(ir::Node *node, const std::vector &local_scopes, const std::vector &places, - const platform::NCCLContextMap *ctxs); + const platform::MultiNCCLContextMap *ctxs); #else +class AllReduceOpHandle : public OpHandleBase { + public: AllReduceOpHandle(ir::Node *node, const std::vector &local_scopes, const std::vector &places); #endif @@ -46,13 +49,17 @@ class AllReduceOpHandle : public OpHandleBase { protected: void RunImpl() override; - std::vector local_scopes_; + +#if !(defined(PADDLE_WITH_CUDA) && !defined(_WIN32)) + // NCCLOpHandleBase already have these attributes. + // Will polish it by class inheritance framework. std::vector places_; +#endif + #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) void RunAllReduceFuncs( const std::vector> &all_reduce_calls); - const platform::NCCLContextMap *nccl_ctxs_; #endif }; diff --git a/paddle/fluid/framework/details/build_strategy.cc b/paddle/fluid/framework/details/build_strategy.cc index b3710dea0..6e8be9553 100644 --- a/paddle/fluid/framework/details/build_strategy.cc +++ b/paddle/fluid/framework/details/build_strategy.cc @@ -256,16 +256,14 @@ bool BuildStrategy::IsMultiDevPass(const std::string &pass_name) const { return framework::ir::MultiDevSSAGraphBuilder().count(pass_name) > 0; } -ir::Graph *BuildStrategy::Apply(ir::Graph *graph, - const std::vector &places, - const std::string &loss_var_name, - const std::vector &local_scopes, - const size_t &nranks, +ir::Graph *BuildStrategy::Apply( + ir::Graph *graph, const std::vector &places, + const std::string &loss_var_name, const std::vector &local_scopes, + const size_t &nranks, #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) - const bool use_cuda, - platform::NCCLContextMap *nccl_ctxs) const { + const bool use_cuda, platform::MultiNCCLContextMap *nccl_ctxs) const { #else - const bool use_cuda) const { + const bool use_cuda) const { #endif VLOG(3) << "apply all passes"; // Create a default one if not finalized by user. @@ -285,9 +283,9 @@ ir::Graph *BuildStrategy::Apply(ir::Graph *graph, pass->Set(ir::kNRanks, new size_t(nranks)); #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) - platform::NCCLContextMap *nctx = use_cuda ? nccl_ctxs : nullptr; + platform::MultiNCCLContextMap *nctx = use_cuda ? nccl_ctxs : nullptr; pass->Erase(kNCCLCtxs); - pass->SetNotOwned(kNCCLCtxs, nctx); + pass->SetNotOwned(kNCCLCtxs, nctx); #endif } else if (pass->Type() == "alloc_continuous_space_for_grad_pass" || pass->Type() == "fuse_adam_op_pass" || @@ -301,9 +299,12 @@ ir::Graph *BuildStrategy::Apply(ir::Graph *graph, &local_scopes); if (pass->Type() == "fuse_all_reduce_op_pass") { #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) - platform::NCCLContextMap *nctx = use_cuda ? nccl_ctxs : nullptr; + platform::MultiNCCLContextMap *nctx = use_cuda ? nccl_ctxs : nullptr; pass->Erase(kNCCLCtxs); - pass->SetNotOwned(kNCCLCtxs, nctx); + pass->SetNotOwned(kNCCLCtxs, nctx); + pass->Erase(kUseHierarchicalAllReduce); + pass->Set(kUseHierarchicalAllReduce, + new bool(use_hierarchical_allreduce_)); #endif } } else if (pass->Type() == "alloc_continuous_space_for_grad_pass") { @@ -316,6 +317,14 @@ ir::Graph *BuildStrategy::Apply(ir::Graph *graph, LOG(INFO) << "set enable_sequential_execution:" << enable_sequential_execution_; } else if (pass->Type() == "all_reduce_deps_pass") { +#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) + platform::MultiNCCLContextMap *nctx = use_cuda ? nccl_ctxs : nullptr; + pass->Erase(kNCCLCtxs); + pass->SetNotOwned(kNCCLCtxs, nctx); + pass->Erase(kUseHierarchicalAllReduce); + pass->Set(kUseHierarchicalAllReduce, + new bool(use_hierarchical_allreduce_)); +#endif LOG(INFO) << "SeqOnlyAllReduceOps:" << SeqOnlyAllReduceOps(*this) << ", num_trainers:" << num_trainers_; } else if (pass->Type() == "fuse_relu_depthwise_conv_pass") { diff --git a/paddle/fluid/framework/details/build_strategy.h b/paddle/fluid/framework/details/build_strategy.h index e7117b49c..96a0fce12 100644 --- a/paddle/fluid/framework/details/build_strategy.h +++ b/paddle/fluid/framework/details/build_strategy.h @@ -111,6 +111,17 @@ struct BuildStrategy { bool cache_runtime_context_{false}; std::unordered_set mkldnn_enabled_op_types_; + size_t nccl_comm_num_{1}; + // The picture is here: + // https://github.com/PaddlePaddle/Paddle/pull/17263#discussion_r285411396 + bool use_hierarchical_allreduce_{false}; + // Nccl ranks in a node when use hierarchical allreduce, it's setted to gpu + // cards' number in most cases. + size_t hierarchical_allreduce_inter_nranks_{0}; + // Nccl ranks bewteen nodes when use hierarchical allreduce, it's setted to + // nodes number. + size_t hierarchical_allreduce_exter_nranks_{0}; + // NOTE: // Before you add new options, think if it's a general strategy that works // with other strategy. If not, the strategy should be created through @@ -136,7 +147,7 @@ struct BuildStrategy { const size_t &nranks, #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) const bool use_cuda, - platform::NCCLContextMap *nccl_ctxs) const; + platform::MultiNCCLContextMap *nccl_ctxs) const; #else const bool use_cuda) const; #endif diff --git a/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc index 941f1c673..3da7e3555 100644 --- a/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc @@ -49,6 +49,7 @@ FastThreadedSSAGraphExecutor::FastThreadedSSAGraphExecutor( FeedFetchList FastThreadedSSAGraphExecutor::Run( const std::vector &fetch_tensors) { + VLOG(3) << "enter FastThreadedSSAGraphExecutor Run"; std::unique_ptr>> op_deps = atomic_op_deps_.get(); PrepareAtomicOpDeps(); diff --git a/paddle/fluid/framework/details/fused_all_reduce_op_handle.cc b/paddle/fluid/framework/details/fused_all_reduce_op_handle.cc index a57d670f1..b6cc9866b 100644 --- a/paddle/fluid/framework/details/fused_all_reduce_op_handle.cc +++ b/paddle/fluid/framework/details/fused_all_reduce_op_handle.cc @@ -44,17 +44,10 @@ typedef std::vector>> FusedAllReduceOpHandle::FusedAllReduceOpHandle( ir::Node *node, const std::vector &local_scopes, const std::vector &places, const size_t num_of_all_reduce, - const platform::NCCLContextMap *ctxs) - : OpHandleBase(node), + const platform::MultiNCCLContextMap *ctxs) + : NCCLOpHandleBase(node, places, ctxs), local_scopes_(local_scopes), - places_(places), - num_of_all_reduce_(num_of_all_reduce), - nccl_ctxs_(ctxs) { - if (nccl_ctxs_) { - for (auto &p : places_) { - this->SetDeviceContext(p, nccl_ctxs_->DevCtx(p)); - } - } + num_of_all_reduce_(num_of_all_reduce) { PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size()); } #else @@ -167,14 +160,9 @@ void FusedAllReduceOpHandle::RunImpl() { auto &p = places_[i]; void *buffer = const_cast(lod_tensor_data.at(i)); - int dev_id = boost::get(p).device; - auto &nccl_ctx = nccl_ctxs_->at(dev_id); - auto stream = nccl_ctx.stream(); - auto comm = nccl_ctx.comm_; all_reduce_calls.emplace_back([=] { - PADDLE_ENFORCE(platform::dynload::ncclAllReduce( - buffer, buffer, numel, static_cast(nccl_dtype), - ncclSum, comm, stream)); + NCCLAllReduce(p, buffer, buffer, numel, + static_cast(nccl_dtype), ncclSum); }); } diff --git a/paddle/fluid/framework/details/fused_all_reduce_op_handle.h b/paddle/fluid/framework/details/fused_all_reduce_op_handle.h index 79772c61f..00730f107 100644 --- a/paddle/fluid/framework/details/fused_all_reduce_op_handle.h +++ b/paddle/fluid/framework/details/fused_all_reduce_op_handle.h @@ -21,6 +21,7 @@ #include "paddle/fluid/framework/lod_tensor.h" #include "paddle/fluid/framework/scope.h" #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) +#include "paddle/fluid/framework/details/nccl_op_handle.h" #include "paddle/fluid/platform/nccl_helper.h" #endif @@ -28,14 +29,15 @@ namespace paddle { namespace framework { namespace details { -struct FusedAllReduceOpHandle : public OpHandleBase { #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) +struct FusedAllReduceOpHandle : public NCCLOpHandleBase { FusedAllReduceOpHandle(ir::Node *node, const std::vector &local_scopes, const std::vector &places, const size_t num_of_all_reduce, - const platform::NCCLContextMap *ctxs); + const platform::MultiNCCLContextMap *ctxs); #else +struct FusedAllReduceOpHandle : public OpHandleBase { FusedAllReduceOpHandle(ir::Node *node, const std::vector &local_scopes, const std::vector &places, @@ -52,11 +54,12 @@ struct FusedAllReduceOpHandle : public OpHandleBase { private: std::vector local_scopes_; +#if !(defined(PADDLE_WITH_CUDA) && !defined(_WIN32)) + // NCCLOpHandleBase already have these attributes. + // Will polish it by class inheritance framework. std::vector places_; - size_t num_of_all_reduce_; -#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) - const platform::NCCLContextMap *nccl_ctxs_; #endif + size_t num_of_all_reduce_; // Check the dtype of the input void GetDTypeAndNumel( diff --git a/paddle/fluid/framework/details/multi_devices_helper.h b/paddle/fluid/framework/details/multi_devices_helper.h index 6e6ef074d..e97e5f439 100644 --- a/paddle/fluid/framework/details/multi_devices_helper.h +++ b/paddle/fluid/framework/details/multi_devices_helper.h @@ -45,6 +45,7 @@ constexpr char kGraphVars[] = "vars"; constexpr char kPlaces[] = "places"; constexpr char kLocalScopes[] = "local_scopes"; constexpr char kNCCLCtxs[] = "nccl_ctxs"; +constexpr char kUseHierarchicalAllReduce[] = "use_hierarchical_allreduce"; // aux variables to represent dependency. Useful to resolve data hazard. typedef std::unordered_set GraphDepVars; diff --git a/paddle/fluid/framework/details/nccl_op_handle.h b/paddle/fluid/framework/details/nccl_op_handle.h new file mode 100644 index 000000000..7f9de6e2f --- /dev/null +++ b/paddle/fluid/framework/details/nccl_op_handle.h @@ -0,0 +1,234 @@ +// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +#include "paddle/fluid/framework/details/op_handle_base.h" +#include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/framework/scope.h" +#include "paddle/fluid/platform/dynload/nccl.h" +#include "paddle/fluid/platform/nccl_helper.h" + +DECLARE_bool(sync_nccl_allreduce); + +namespace paddle { +namespace framework { +namespace details { + +class NCCLOpHandleBase : public OpHandleBase { + public: + NCCLOpHandleBase(ir::Node* node, const std::vector& places, + const platform::MultiNCCLContextMap* nccl_ctxs) + : OpHandleBase(node), places_(places), nccl_ctxs_(nccl_ctxs) { + if (nccl_ctxs == nullptr) { + return; + } + // init device context + auto default_nccl_ctxs = nccl_ctxs_->DefaultFlatCtx(); + for (auto& p : places_) { + this->SetDeviceContext(p, default_nccl_ctxs->DevCtx(p)); + } + } + virtual ~NCCLOpHandleBase() { + for (auto& ev : inter_events_) { + PADDLE_ENFORCE(cudaEventDestroy(ev.second)); + } + for (auto& ev : exter_events_) { + PADDLE_ENFORCE(cudaEventDestroy(ev.second)); + } + } + void SetRunEnv(int run_order, bool use_hierarchical_allreduce) { + PADDLE_ENFORCE(run_order >= 0, "run_order must >= 0"); + run_order_ = run_order; + use_hierarchical_allreduce_ = use_hierarchical_allreduce; + + VLOG(10) << "SetRunEnv " + << " run_order:" << run_order + << ", use_hierarchical_allreduce:" << use_hierarchical_allreduce; + + if (nccl_ctxs_ == nullptr) { + return; + } + + if (!use_hierarchical_allreduce_) { + auto ctxs = nccl_ctxs_->GetFlatCtx(run_order); + for (auto& p : places_) { + this->SetDeviceContext(p, ctxs->DevCtx(p)); + } + return; + } + + PADDLE_ENFORCE(places_.size() == 1, + "HierarchicalAllReduce run one proc with one card mode."); + + for (auto& p : places_) { + auto ctxs = nccl_ctxs_->GetHierarchicalInterCtx(run_order); + this->SetDeviceContext(p, ctxs->DevCtx(p)); + } + + for (auto& p : dev_ctxes_) { + int dev_id = boost::get(p.first).device; + if (inter_events_.find(dev_id) != inter_events_.end()) { + continue; + } + + PADDLE_ENFORCE(cudaSetDevice(dev_id)); + PADDLE_ENFORCE(cudaEventCreateWithFlags(&inter_events_[dev_id], + cudaEventDisableTiming)); + PADDLE_ENFORCE(cudaEventCreateWithFlags(&exter_events_[dev_id], + cudaEventDisableTiming)); + VLOG(10) << "Create events on dev_id:" << dev_id + << ", inter_event:" << &inter_events_[dev_id] + << ", exter_event:" << &exter_events_[dev_id]; + } + } + + void FlatNCCLAllReduce(platform::Place place, const void* sendbuff, + void* recvbuff, size_t count, ncclDataType_t datatype, + ncclRedOp_t op) { + PADDLE_ENFORCE(run_order_ >= 0, "run_order must > 0"); + auto flat_nccl_ctxs = nccl_ctxs_->GetFlatCtx(run_order_); + int dev_id = boost::get(place).device; + auto& nccl_ctx = flat_nccl_ctxs->at(dev_id); + auto stream = nccl_ctx.stream(); + auto comm = nccl_ctx.comm_; + + VLOG(10) << "before all reduce buffer:" << sendbuff << ", numel:" << count + << ", dev_id:" << dev_id << ", dtype:" << datatype + << ", place:" << place; + + PADDLE_ENFORCE(platform::dynload::ncclAllReduce( + sendbuff, recvbuff, count, datatype, op, comm, stream)); + } + + void NCCLAllReduce(platform::Place place, const void* sendbuff, + void* recvbuff, size_t count, ncclDataType_t datatype, + ncclRedOp_t op) { + PADDLE_ENFORCE(run_order_ >= 0, "run_order must > 0"); + if (!use_hierarchical_allreduce_) { + FlatNCCLAllReduce(place, sendbuff, recvbuff, count, datatype, op); + return; + } + + HierarchicalAllReduce(place, sendbuff, recvbuff, count, datatype, op); + } + + void HierarchicalAllReduce(platform::Place place, const void* sendbuff, + void* recvbuff, size_t count, + ncclDataType_t datatype, ncclRedOp_t op) { + PADDLE_ENFORCE(run_order_ >= 0, "run_order must > 0"); + InterReduce(place, sendbuff, recvbuff, count, datatype, op); + // When a trainer is not in exter allreduce ring + // they need not to call this. + if (nccl_ctxs_->NeedExterAllReduce()) { + ExterAllReduce(place, recvbuff, recvbuff, count, datatype, op); + } + InterBroadCast(place, recvbuff, count, datatype, op); + } + + protected: + void InterReduce(platform::Place place, const void* sendbuff, void* recvbuff, + size_t count, ncclDataType_t datatype, ncclRedOp_t op) { + auto nccl_ctxs = nccl_ctxs_->GetHierarchicalInterCtx(run_order_); + int dev_id = boost::get(place).device; + auto& nccl_ctx = nccl_ctxs->at(dev_id); + auto stream = nccl_ctx.stream(); + auto comm = nccl_ctx.comm_; + + VLOG(10) << "before all reduce" + << " run_order:" << run_order_ << ", buffer:" << sendbuff + << ", numel:" << count << ", dev_id:" << dev_id + << ", dtype:" << datatype << ", place:" << place + << ", stream:" << stream; + + PADDLE_ENFORCE(platform::dynload::ncclReduce( + sendbuff, recvbuff, count, datatype, ncclSum, 0, comm, stream)); + + cudaEventRecord(inter_events_.at(dev_id), stream); + + if (FLAGS_sync_nccl_allreduce) { + PADDLE_ENFORCE(cudaStreamSynchronize(stream), + "sync HierarchicalAllReduce inter stream error"); + } + } + + void ExterAllReduce(platform::Place place, const void* sendbuff, + void* recvbuff, size_t count, ncclDataType_t datatype, + ncclRedOp_t op) { + auto nccl_ctxs = nccl_ctxs_->GetHierarchicalExterCtx(run_order_); + PADDLE_ENFORCE(nccl_ctxs_, "can't get exter %d nccl_ctxs", run_order_); + int dev_id = boost::get(place).device; + auto& nccl_ctx = nccl_ctxs->at(dev_id); + auto stream = nccl_ctx.stream(); + auto comm = nccl_ctx.comm_; + + VLOG(10) << "before all reduce run_order:" << run_order_ + << "buffer:" << sendbuff << ", numel:" << count + << ", dev_id:" << dev_id << ", dtype:" << datatype + << ", place:" << place << ", stream:" << stream; + + cudaStreamWaitEvent(stream, inter_events_.at(dev_id), 0); + + PADDLE_ENFORCE(platform::dynload::ncclAllReduce( + sendbuff, recvbuff, count, datatype, op, comm, stream)); + + cudaEventRecord(exter_events_.at(dev_id), stream); + + if (FLAGS_sync_nccl_allreduce) { + PADDLE_ENFORCE(cudaStreamSynchronize(stream), + "sync HierarchicalAllReduce exter stream error"); + } + } + + void InterBroadCast(platform::Place place, void* sendbuff, size_t count, + ncclDataType_t datatype, ncclRedOp_t op) { + auto nccl_ctxs = nccl_ctxs_->GetHierarchicalInterCtx(run_order_); + int dev_id = boost::get(place).device; + auto& nccl_ctx = nccl_ctxs->at(dev_id); + auto stream = nccl_ctx.stream(); + auto comm = nccl_ctx.comm_; + + VLOG(10) << "before InterBroadCast buffer:" << sendbuff + << ", numel:" << count << ", dev_id:" << dev_id + << ", dtype:" << datatype << ", place:" << place + << ", stream:" << stream; + + cudaStreamWaitEvent(stream, exter_events_.at(dev_id), 0); + PADDLE_ENFORCE(platform::dynload::ncclBcast(sendbuff, count, datatype, 0, + comm, stream)); + } + + protected: + std::vector places_; + const platform::MultiNCCLContextMap* nccl_ctxs_{nullptr}; + // When multi trainer call collective function, they need run the same order. + // Or the program will hang.So we use allreduce_deps_pass to set this + // run_order_. + int run_order_{0}; + // Use 2d allreduce or not. + bool use_hierarchical_allreduce_{false}; + + private: + // hierarchical needed events + std::unordered_map inter_events_; + std::unordered_map exter_events_; +}; + +} // namespace details +} // namespace framework +} // namespace paddle diff --git a/paddle/fluid/framework/details/op_handle_base.cc b/paddle/fluid/framework/details/op_handle_base.cc index 69cd84ebf..a3676ac0c 100644 --- a/paddle/fluid/framework/details/op_handle_base.cc +++ b/paddle/fluid/framework/details/op_handle_base.cc @@ -187,6 +187,11 @@ void OpHandleBase::RunAndRecordEvent(const std::function &callback) { std::function method = callback; for (auto &p : dev_ctxes_) { method = [method, p, this]() { + VLOG(10) << "cudadevicecontext:" + << static_cast(p.second) + << ", dev_id:" + << boost::get(p.first).device; + static_cast(p.second)->RecordEvent( events_.at(boost::get(p.first).device), method); diff --git a/paddle/fluid/framework/details/parallel_ssa_graph_executor.cc b/paddle/fluid/framework/details/parallel_ssa_graph_executor.cc index 1bd27263f..68be353e3 100644 --- a/paddle/fluid/framework/details/parallel_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/parallel_ssa_graph_executor.cc @@ -95,6 +95,7 @@ ParallelSSAGraphExecutor::ParallelSSAGraphExecutor( auto seq_allreduce_pass = ir::PassRegistry::Instance().Get("all_reduce_deps_pass"); + seq_allreduce_pass->Set(kUseHierarchicalAllReduce, new bool(false)); for (size_t i = 0; i < graphs_.size(); ++i) { graphs_[i].reset(seq_allreduce_pass->Apply(graphs_[i].release())); } diff --git a/paddle/fluid/framework/details/sparse_all_reduce_op_handle.cc b/paddle/fluid/framework/details/sparse_all_reduce_op_handle.cc index 1bdd33fd5..5c7d6db30 100644 --- a/paddle/fluid/framework/details/sparse_all_reduce_op_handle.cc +++ b/paddle/fluid/framework/details/sparse_all_reduce_op_handle.cc @@ -30,7 +30,7 @@ namespace details { SparseAllReduceOpHandle::SparseAllReduceOpHandle( ir::Node *node, const std::vector &local_scopes, const std::vector &places, - const platform::NCCLContextMap *ctxs, bool is_encoded, int nranks) + const platform::MultiNCCLContextMap *ctxs, bool is_encoded, int nranks) : AllReduceOpHandle(node, local_scopes, places, ctxs), is_encoded_(is_encoded), nranks_(nranks) { @@ -102,7 +102,8 @@ void SparseAllReduceOpHandle::RunImplEncoded() { out_numel = (out_numel == 0) ? static_cast(out.numel()) : out_numel; int dev_id = boost::get(place).device; - auto &nccl_ctx = nccl_ctxs_->at(dev_id); + auto *nccl_ctxs = nccl_ctxs_->GetRunEnvNCCLCtx(run_order_, false); + auto &nccl_ctx = nccl_ctxs->at(dev_id); auto stream = nccl_ctx.stream(); auto comm = nccl_ctx.comm_; diff --git a/paddle/fluid/framework/details/sparse_all_reduce_op_handle.h b/paddle/fluid/framework/details/sparse_all_reduce_op_handle.h index ed6be65a2..b3ff6cd39 100644 --- a/paddle/fluid/framework/details/sparse_all_reduce_op_handle.h +++ b/paddle/fluid/framework/details/sparse_all_reduce_op_handle.h @@ -32,7 +32,7 @@ class SparseAllReduceOpHandle : public AllReduceOpHandle { SparseAllReduceOpHandle(ir::Node *node, const std::vector &local_scopes, const std::vector &places, - const platform::NCCLContextMap *ctxs, + const platform::MultiNCCLContextMap *ctxs, bool is_encoded = false, int nranks = -1); std::string Name() const override; diff --git a/paddle/fluid/framework/ir/multi_devices_graph_pass/all_reduce_deps_pass.cc b/paddle/fluid/framework/ir/multi_devices_graph_pass/all_reduce_deps_pass.cc index 314f8c042..1019c4f84 100644 --- a/paddle/fluid/framework/ir/multi_devices_graph_pass/all_reduce_deps_pass.cc +++ b/paddle/fluid/framework/ir/multi_devices_graph_pass/all_reduce_deps_pass.cc @@ -22,6 +22,7 @@ #include "paddle/fluid/framework/details/all_reduce_op_handle.h" #include "paddle/fluid/framework/details/container_cast.h" +#include "paddle/fluid/framework/details/fused_all_reduce_op_handle.h" #include "paddle/fluid/framework/details/multi_devices_helper.h" #include "paddle/fluid/framework/ir/graph.h" #include "paddle/fluid/framework/ir/graph_helper.h" @@ -35,9 +36,20 @@ namespace ir { class AllReduceDepsPass : public ir::Pass { protected: void ApplyImpl(ir::Graph* graph) const override { - std::vector all_reduce_op_handles = + std::vector all_reduce_op_handles = GetSortedAllReduceOps(*graph); +#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) + auto use_hierarchical_allreduce = + Get(details::kUseHierarchicalAllReduce); + for (size_t i = 0; i < all_reduce_op_handles.size(); ++i) { + auto op_handle = + dynamic_cast(all_reduce_op_handles[i]); + PADDLE_ENFORCE(op_handle, "op_handle must be NCCLOpHandleBase"); + op_handle->SetRunEnv(i, use_hierarchical_allreduce); + } +#endif + for (size_t i = 1; i < all_reduce_op_handles.size(); ++i) { auto* dep_var = new details::DummyVarHandle(graph->CreateControlDepVar()); graph->Get(details::kGraphDepVars) @@ -51,13 +63,12 @@ class AllReduceDepsPass : public ir::Pass { } } - std::vector GetSortedAllReduceOps( + std::vector GetSortedAllReduceOps( const ir::Graph& graph) const { - std::vector all_reduce_op_handles; + std::vector all_reduce_op_handles; std::unordered_map pending_ops; std::unordered_set ready_ops; std::unordered_set next_ready_ops; - auto op_handles = ir::FilterByNodeWrapper(graph); size_t num_of_ops = op_handles.size(); for (details::OpHandleBase* op : op_handles) { @@ -95,13 +106,16 @@ class AllReduceDepsPass : public ir::Pass { void GetSortedAllReduceOps( const std::unordered_set& ready_ops, - std::vector* all_reduce_op_handles) const { - std::vector current_all_reduce_op_handles; + std::vector* all_reduce_op_handles) const { + std::vector current_all_reduce_op_handles; for (auto& op_handle : ready_ops) { auto all_reduce_op_handle = dynamic_cast(op_handle); - if (all_reduce_op_handle) { - current_all_reduce_op_handles.emplace_back(all_reduce_op_handle); + auto fused_all_reduce_op_handle = + dynamic_cast(op_handle); + + if (all_reduce_op_handle || fused_all_reduce_op_handle) { + current_all_reduce_op_handles.emplace_back(op_handle); } } @@ -110,8 +124,8 @@ class AllReduceDepsPass : public ir::Pass { // Sort the current_all_reduce_op_handles according to the name of input. sort(current_all_reduce_op_handles.begin(), current_all_reduce_op_handles.end(), - [](const details::AllReduceOpHandle* left, - const details::AllReduceOpHandle* right) -> bool { + [](const details::OpHandleBase* left, + const details::OpHandleBase* right) -> bool { auto left_in_vars = details::DynamicCast(left->Inputs()); auto right_in_vars = @@ -126,9 +140,9 @@ class AllReduceDepsPass : public ir::Pass { current_all_reduce_op_handles.end()); } - void DebugString(const ir::Graph& graph, - const std::vector& - all_reduce_op_handles) const { + void DebugString( + const ir::Graph& graph, + const std::vector& all_reduce_op_handles) const { // get vars order std::map> vars = GetSoredGradientsFromStaleProgram(graph); diff --git a/paddle/fluid/framework/ir/multi_devices_graph_pass/fuse_all_reduce_op_pass.cc b/paddle/fluid/framework/ir/multi_devices_graph_pass/fuse_all_reduce_op_pass.cc index a2b4c37ab..a7c492f0c 100644 --- a/paddle/fluid/framework/ir/multi_devices_graph_pass/fuse_all_reduce_op_pass.cc +++ b/paddle/fluid/framework/ir/multi_devices_graph_pass/fuse_all_reduce_op_pass.cc @@ -34,7 +34,8 @@ class FuseAllReduceOpPass : public ir::Pass { auto &places = Get>(details::kPlaces); auto &local_scopes = Get>(details::kLocalScopes); #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) - auto *nccl_ctxs = &Get(details::kNCCLCtxs); + auto *multi_nccl_ctxs = + &Get(details::kNCCLCtxs); #endif std::unordered_set grads; @@ -94,7 +95,7 @@ class FuseAllReduceOpPass : public ir::Pass { } #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) InsertFusedAllReduce(places, local_scopes, group_size, - group_all_reduce_ops, nccl_ctxs, &result); + group_all_reduce_ops, multi_nccl_ctxs, &result); #else InsertFusedAllReduce(places, local_scopes, group_size, group_all_reduce_ops, &result); @@ -102,14 +103,14 @@ class FuseAllReduceOpPass : public ir::Pass { } } - void InsertFusedAllReduce(const std::vector &places, - const std::vector &local_scopes, - const size_t num_of_all_reduce, - const std::vector &all_reduce_ops, + void InsertFusedAllReduce( + const std::vector &places, + const std::vector &local_scopes, const size_t num_of_all_reduce, + const std::vector &all_reduce_ops, #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) - const platform::NCCLContextMap *nccl_ctxs, + const platform::MultiNCCLContextMap *multi_nccl_ctxs, #endif - ir::Graph *result) const { + ir::Graph *result) const { std::vector inputs; std::vector outputs; for (auto &op : all_reduce_ops) { @@ -135,7 +136,7 @@ class FuseAllReduceOpPass : public ir::Pass { #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) CreateFusedAllReduceOp(inputs, outputs, num_of_all_reduce, places, - local_scopes, nccl_ctxs, result); + local_scopes, multi_nccl_ctxs, result); #else CreateFusedAllReduceOp(inputs, outputs, num_of_all_reduce, places, local_scopes, result); @@ -150,13 +151,13 @@ class FuseAllReduceOpPass : public ir::Pass { const std::vector &places, const std::vector &local_scopes, #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) - const platform::NCCLContextMap *nccl_ctxs, + const platform::MultiNCCLContextMap *multi_nccl_ctxs, #endif ir::Graph *result) const { #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) auto *op_handle = new details::FusedAllReduceOpHandle( result->CreateEmptyNode("fused_all_reduce", ir::Node::Type::kOperation), - local_scopes, places, num_of_all_reduce, nccl_ctxs); + local_scopes, places, num_of_all_reduce, multi_nccl_ctxs); #else auto *op_handle = new details::FusedAllReduceOpHandle( result->CreateEmptyNode("fused_all_reduce", ir::Node::Type::kOperation), @@ -172,7 +173,7 @@ class FuseAllReduceOpPass : public ir::Pass { } #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) - if (!nccl_ctxs) { + if (!multi_nccl_ctxs) { SetCommunicationContext(places, op_handle); } #else diff --git a/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.cc b/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.cc index a4cb0599a..6127f6ac2 100644 --- a/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.cc +++ b/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.cc @@ -157,7 +157,11 @@ void MultiDevSSAGraphBuilderBase::Init() const { local_scopes_ = Get>(details::kLocalScopes); strategy_ = Get(kStrategy); #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) - nccl_ctxs_ = &Get(details::kNCCLCtxs); + multi_nccl_ctxs_ = &Get(details::kNCCLCtxs); + nccl_ctxs_ = nullptr; + if (multi_nccl_ctxs_) { + nccl_ctxs_ = multi_nccl_ctxs_->DefaultFlatCtx(); + } #endif PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size()); } @@ -460,20 +464,20 @@ void MultiDevSSAGraphBuilderBase::CreateAllReduceOp(ir::Graph *result, result->Get(kGraphOps).emplace_back( new details::SparseAllReduceOpHandle( result->CreateEmptyNode("allreduce", ir::Node::Type::kOperation), - scopes, places, nccl_ctxs_, is_encoded, + scopes, places, multi_nccl_ctxs_, is_encoded, static_cast(strategy_.trainers_endpoints_.size()) * places_.size())); } else { result->Get(kGraphOps).emplace_back( new details::AllReduceOpHandle( result->CreateEmptyNode("allreduce", ir::Node::Type::kOperation), - scopes, places, nccl_ctxs_)); + scopes, places, multi_nccl_ctxs_)); } #elif defined(PADDLE_WITH_CUDA) && !defined(_WIN32) result->Get(kGraphOps).emplace_back( new details::AllReduceOpHandle( result->CreateEmptyNode("allreduce", ir::Node::Type::kOperation), - scopes, places, nccl_ctxs_)); + scopes, places, multi_nccl_ctxs_)); #else result->Get(kGraphOps).emplace_back( new details::AllReduceOpHandle( diff --git a/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.h b/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.h index a377bbf6b..278621bf6 100644 --- a/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.h +++ b/paddle/fluid/framework/ir/multi_devices_graph_pass/multi_devices_graph_pass.h @@ -96,7 +96,8 @@ class MultiDevSSAGraphBuilderBase : public ir::Pass { size_t device_id) const; #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) - mutable platform::NCCLContextMap *nccl_ctxs_; + mutable platform::NCCLContextMap *nccl_ctxs_{nullptr}; + mutable platform::MultiNCCLContextMap *multi_nccl_ctxs_{nullptr}; #endif mutable std::string loss_var_name_; diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index f400e8a5c..43b8645fb 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -94,6 +94,89 @@ class ParallelExecutorPrivate { } } +#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) + void InitNCCLCtxs(framework::Scope *scope, const BuildStrategy &bst) { + VLOG(1) << "nccl comm num:" << bst.nccl_comm_num_ << ", nranks:" << nranks_ + << ", num_trainers:" << bst.num_trainers_ + << ", trainer_id:" << bst.trainer_id_; + + if (bst.use_hierarchical_allreduce_) { + VLOG(1) << ", use_hierarchical_allreduce:" + << bst.use_hierarchical_allreduce_ << ", inter_trainers_num:" + << bst.hierarchical_allreduce_inter_nranks_ + << ", exter_trainers_num:" + << bst.hierarchical_allreduce_exter_nranks_; + } + + std::vector flat_nccl_ids; + if (nranks_ == 1) { + // FIXME(gongwb): need not to create ncclid when nranks==1 + nccl_ctxs_.InitFlatCtxs(places_, flat_nccl_ids, bst.num_trainers_, + bst.trainer_id_); + return; + } + + if (bst.enable_parallel_graph_) { + VLOG(1) << "use only one ncclid in pg model"; + + ncclUniqueId *nccl_id = nullptr; + + std::string var_name = platform::GetFlatNCCLVarName(0); + auto nccl_id_var = scope->FindVar(var_name); + if (nccl_id_var) { + nccl_id = nccl_id_var->GetMutable(); + } else { + nccl_id = new ncclUniqueId(); + PADDLE_ENFORCE(platform::dynload::ncclGetUniqueId(nccl_id)); + } + + flat_nccl_ids.push_back(nccl_id); + + nccl_ctxs_.InitFlatCtxs(places_, flat_nccl_ids, bst.num_trainers_, + bst.trainer_id_); + VLOG(1) << "init bst nccl context complete!"; + return; + } + + // num_trainers ==1 && places > 1 + if (bst.num_trainers_ == 1) { + nccl_ctxs_.InitFlatCtxs(places_, flat_nccl_ids, bst.num_trainers_, + bst.trainer_id_); + return; + } + + for (int i = 0; i < static_cast(bst.nccl_comm_num_); i++) { + std::string var_name = platform::GetFlatNCCLVarName(i); + auto nccl_id_var = scope->FindVar(var_name); + PADDLE_ENFORCE(nccl_id_var, "can't find %s nccl_id_var", var_name); + auto nccl_id = nccl_id_var->GetMutable(); + flat_nccl_ids.push_back(nccl_id); + } + + nccl_ctxs_.InitFlatCtxs(places_, flat_nccl_ids, bst.num_trainers_, + bst.trainer_id_); + + if (bst.use_hierarchical_allreduce_) { + std::string var_name = platform::GetHierarchicalInterNCCLVarName(); + auto nccl_id_var = scope->FindVar(var_name); + auto inter_nccl_id = nccl_id_var->GetMutable(); + + std::vector exter_nccl_ids; + for (int i = 0; i < static_cast(bst.nccl_comm_num_); i++) { + std::string var_name = platform::GetHierarchicalExterNCCLVarName(i); + auto nccl_id_var = scope->FindVar(var_name); + PADDLE_ENFORCE(nccl_id_var, "can't find %s nccl_id_var", var_name); + auto nccl_id = nccl_id_var->GetMutable(); + exter_nccl_ids.push_back(nccl_id); + } + nccl_ctxs_.InitHierarchicalCtxs(places_, inter_nccl_id, exter_nccl_ids, + bst.num_trainers_, bst.trainer_id_, + bst.hierarchical_allreduce_inter_nranks_, + bst.hierarchical_allreduce_exter_nranks_); + } + } +#endif + BuildStrategy build_strategy_; std::vector places_; std::vector local_scopes_; @@ -101,7 +184,7 @@ class ParallelExecutorPrivate { std::unique_ptr executor_; #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) - std::unique_ptr nccl_ctxs_; + platform::MultiNCCLContextMap nccl_ctxs_; #endif bool own_local_scope_; bool use_cuda_; @@ -254,24 +337,7 @@ ParallelExecutor::ParallelExecutor(const std::vector &places, if (member_->use_cuda_) { // Bcast Parameters to all GPUs #if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) - ncclUniqueId *nccl_id = nullptr; - // gen_nccl_id operator can broadcast the ncclUniqueId for nccl2 collective - // distributed training - auto *nccl_id_var = scope->FindVar(NCCL_ID_VARNAME); - if (nccl_id_var != nullptr) { - nccl_id = nccl_id_var->GetMutable(); - } - if (build_strategy.enable_parallel_graph_ && member_->nranks_ > 1UL) { - if (nccl_id == nullptr) { - local_nccl_id_.reset(new ncclUniqueId()); - platform::dynload::ncclGetUniqueId(local_nccl_id_.get()); - nccl_id = local_nccl_id_.get(); - } - } - - member_->nccl_ctxs_.reset(new platform::NCCLContextMap( - member_->places_, nccl_id, build_strategy.num_trainers_, - build_strategy.trainer_id_)); + member_->InitNCCLCtxs(scope, build_strategy); // Initialize device context's nccl comm, will be used by normal // Operators like sync_batch_norm, and collective ops. @@ -280,22 +346,14 @@ ParallelExecutor::ParallelExecutor(const std::vector &places, // NOTE: NCCL group-calls and non-group-calls can not use the same // NCCL communicator, so for ParallelGraph and Multi-Process mode, re-use // same communicators. - std::unique_ptr dev_nccl_ctxs; - if (nccl_id == nullptr) { - dev_nccl_ctxs.reset(new platform::NCCLContextMap(member_->places_)); - } for (size_t dev_id = 0; dev_id < member_->places_.size(); ++dev_id) { platform::DeviceContextPool &pool = platform::DeviceContextPool::Instance(); auto *dev_ctx = static_cast( pool.Get(member_->places_[dev_id])); - if (nccl_id != nullptr) { - auto &nccl_ctx = member_->nccl_ctxs_->at(member_->places_[dev_id]); - dev_ctx->set_nccl_comm(nccl_ctx.comm()); - } else { - auto &nccl_ctx = dev_nccl_ctxs->at(member_->places_[dev_id]); - dev_ctx->set_nccl_comm(nccl_ctx.comm()); - } + auto &nccl_ctx = + member_->nccl_ctxs_.DefaultFlatCtx()->at(member_->places_[dev_id]); + dev_ctx->set_nccl_comm(nccl_ctx.comm()); } #else PADDLE_THROW("Not compiled with CUDA"); @@ -327,18 +385,18 @@ ParallelExecutor::ParallelExecutor(const std::vector &places, VLOG(3) << "use local async mode"; graph = build_strategy.Apply(graph, {member_->places_[0]}, loss_var_name, {member_->local_scopes_[0]}, 1, - member_->use_cuda_, member_->nccl_ctxs_.get()); + member_->use_cuda_, &member_->nccl_ctxs_); for (size_t i = 1; i < member_->places_.size(); ++i) { graphs[i] = build_strategy.Apply(graphs[i], {member_->places_[i]}, loss_var_name, {member_->local_scopes_[i]}, 1, - member_->use_cuda_, member_->nccl_ctxs_.get()); + member_->use_cuda_, &member_->nccl_ctxs_); async_graphs[i] = graphs[i]; } } else { graph = build_strategy.Apply(graph, member_->places_, loss_var_name, member_->local_scopes_, member_->nranks_, - member_->use_cuda_, member_->nccl_ctxs_.get()); + member_->use_cuda_, &member_->nccl_ctxs_); } #else if (build_strategy.async_mode_) { @@ -471,13 +529,14 @@ void ParallelExecutor::BCastParamsToDevices( PADDLE_ENFORCE_EQ(member_->places_.size(), buffers.size(), "variables' buffer size to bcast NOT equal to places"); { + auto *nccl_ctxs = member_->nccl_ctxs_.DefaultFlatCtx(); platform::NCCLGroupGuard guard; for (size_t i = 0; i < member_->places_.size(); ++i) { - auto &nccl_ctx = member_->nccl_ctxs_->at(member_->places_[i]); + auto &nccl_ctx = nccl_ctxs->at(member_->places_[i]); platform::dynload::ncclBcast(buffers[i], numel, data_type, 0, nccl_ctx.comm_, nccl_ctx.stream()); } - member_->nccl_ctxs_->WaitAll(); + nccl_ctxs->WaitAll(); } #else PADDLE_THROW("Not compiled with CUDA"); @@ -512,6 +571,7 @@ void ParallelExecutor::BCastParamsToDevices( void ParallelExecutor::Run(const std::vector &fetch_tensors, const std::string &fetched_var_name) { + VLOG(3) << "enter ParallelExecutor Run"; #ifdef WITH_GPERFTOOLS if (gProfileStarted) { ProfilerFlush(); @@ -522,6 +582,8 @@ void ParallelExecutor::Run(const std::vector &fetch_tensors, if (member_->HasGarbageCollectors()) { member_->ResetRuntimeReferenceCount(fetch_tensors, fetched_var_name); } + + VLOG(3) << "ParallelExecutor begin to run member_->executor_->Run"; auto fetch_data = member_->executor_->Run(fetch_tensors); *member_->global_scope_->Var(fetched_var_name)->GetMutable() = fetch_data; diff --git a/paddle/fluid/framework/parallel_executor.h b/paddle/fluid/framework/parallel_executor.h index 2de6b7f73..bdd323bd1 100644 --- a/paddle/fluid/framework/parallel_executor.h +++ b/paddle/fluid/framework/parallel_executor.h @@ -87,9 +87,6 @@ class ParallelExecutor { ParallelExecutorPrivate *member_; std::vector> async_graphs_; -#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32) - std::unique_ptr local_nccl_id_; -#endif }; } // namespace framework diff --git a/paddle/fluid/operators/distributed_ops/gen_nccl_id_op.cc b/paddle/fluid/operators/distributed_ops/gen_nccl_id_op.cc index 80d712a0e..2446550ff 100644 --- a/paddle/fluid/operators/distributed_ops/gen_nccl_id_op.cc +++ b/paddle/fluid/operators/distributed_ops/gen_nccl_id_op.cc @@ -41,31 +41,129 @@ class GenNCCLIdOp : public framework::OperatorBase { // put nccl id in CPUPlace auto& dev_ctx = *pool.Get(platform::CPUPlace()); int trainer_id = Attr("trainer_id"); + + std::vector trainers = + Attr>("trainers"); + PADDLE_ENFORCE( + trainer_id >= 0 && trainer_id < static_cast(trainers.size()), + "trainer_id:%d must be in trainers.size range", trainer_id); + std::string endpoint = trainers[trainer_id]; + framework::Scope& local_scope = scope.NewScope(); + int nccl_comm_num = Attr("nccl_comm_num"); + int use_hierarchical_allreduce = Attr("use_hierarchical_allreduce"); + int inter_nranks = Attr("hierarchical_allreduce_inter_nranks"); + + int inter_trainer_id = -1; + int exter_trainer_id = -1; + if (use_hierarchical_allreduce) { + PADDLE_ENFORCE(trainers.size() > 1, "trainers.size():%llu < 1", + trainers.size()); + PADDLE_ENFORCE(inter_nranks > 1, "inter_nranks:%d < 1", inter_nranks); + PADDLE_ENFORCE((trainers.size() % inter_nranks == 0), + "trainers.size():%llu mod inter_nranks:%d != 0", + trainers.size(), inter_nranks); + + inter_trainer_id = trainer_id % inter_nranks; + + if (trainer_id % inter_nranks == 0) { + exter_trainer_id = trainer_id / inter_nranks; + } + } + + if (trainer_id != 0) { + GetIdByServer(endpoint, &local_scope, dev_ctx, nccl_comm_num, + use_hierarchical_allreduce, trainer_id, inter_trainer_id, + exter_trainer_id); + } + + std::ostringstream ss; + for (size_t i = 0; i < trainers.size(); i++) { + ss << trainers[i] << ","; + } + + VLOG(1) << "trainer_id:" << trainer_id + << ", use_hierarchical_allreduce:" << use_hierarchical_allreduce + << ", inter_nranks:" << inter_nranks + << ", inter_trainer_id:" << inter_trainer_id + << ", exter_trainer_id:" << exter_trainer_id + << ", trainers:" << ss.str(); + + // init flat if (trainer_id == 0) { - GenerateAndSend(&local_scope, dev_ctx); - } else { - GetIdByServer(&local_scope, dev_ctx); + std::vector flat_endpoints; + flat_endpoints.insert(flat_endpoints.begin(), trainers.begin() + 1, + trainers.end()); + // flat nccl_id + for (int i = 0; i < nccl_comm_num; i++) { + std::string var_name = platform::GetFlatNCCLVarName(i); + GenerateAndSend(&local_scope, dev_ctx, var_name, flat_endpoints); + } + } + + if (!use_hierarchical_allreduce) { + return; + } + + PADDLE_ENFORCE(trainers.size() % inter_nranks == 0, + "enpoints.size:%llu mod inter_nranks:%d should ==0", + trainers.size(), inter_nranks); + PADDLE_ENFORCE(inter_nranks > 1, "inter_nranks:%d must > 1", inter_nranks); + + // hierarchical inter ncclid + if (inter_trainer_id == 0) { + std::ostringstream ss; + ss << endpoint; + std::vector inter_endpoints; + for (int i = trainer_id + 1; i < trainer_id + inter_nranks && + i < static_cast(trainers.size()); + i++) { + ss << ","; + inter_endpoints.push_back(trainers[i]); + ss << trainers[i]; + } + VLOG(1) << "Hierarchical inter ring endpoints:" << ss.str(); + std::string nccl_var_name = platform::GetHierarchicalInterNCCLVarName(); + GenerateAndSend(&local_scope, dev_ctx, nccl_var_name, inter_endpoints); + } + + // hierarchical exter ncclid + if (exter_trainer_id == 0) { + std::ostringstream ss; + std::vector exter_endpoints; + ss << endpoint; + for (size_t i = inter_nranks; i < trainers.size(); i += inter_nranks) { + ss << ","; + exter_endpoints.push_back(trainers[i]); + ss << trainers[i]; + } + VLOG(1) << "Hierarchical exter ring endpoints:" << ss.str(); + for (int i = 0; i < nccl_comm_num; i++) { + std::string nccl_var_name = + platform::GetHierarchicalExterNCCLVarName(i); + GenerateAndSend(&local_scope, dev_ctx, nccl_var_name, exter_endpoints); + } } } private: void GenerateAndSend(framework::Scope* scope, - const platform::DeviceContext& dev_ctx) const { - auto var = scope->FindVar(NCCL_ID_VARNAME); - PADDLE_ENFORCE_NOT_NULL(var); + const platform::DeviceContext& dev_ctx, + const std::string& nccl_id_name, + const std::vector& endpoint_list) const { + auto var = scope->FindVar(nccl_id_name); + PADDLE_ENFORCE_NOT_NULL(var, "can't find nccl_id_var_name:%s", + nccl_id_name); auto id = var->GetMutable(); PADDLE_ENFORCE(platform::dynload::ncclGetUniqueId(id)); - std::vector endpoint_list = - Attr>("endpoint_list"); distributed::RPCClient* client = distributed::RPCClient::GetInstance(0); for (auto& ep : endpoint_list) { - VLOG(3) << "sending nccl id to " << ep; - client->AsyncSendVar(ep, dev_ctx, *scope, NCCL_ID_VARNAME); + VLOG(3) << "sending nccl_id_var:" << nccl_id_name << " to " << ep; + client->AsyncSendVar(ep, dev_ctx, *scope, nccl_id_name); } client->Wait(); for (auto& ep : endpoint_list) { @@ -75,9 +173,11 @@ class GenNCCLIdOp : public framework::OperatorBase { VLOG(3) << "sending completed..."; } - void GetIdByServer(framework::Scope* scope, - const platform::DeviceContext& dev_ctx) const { - std::string endpoint = Attr("endpoint"); + void GetIdByServer(const std::string& endpoint, framework::Scope* scope, + const platform::DeviceContext& dev_ctx, int nccl_comm_num, + bool use_hierarchical_allreduce, int trainer_id, + int inter_trainer_id, int exter_trainer_id) const { + // std::string endpoint = Attr("endpoint"); // NOTE: Can not use unique_ptr here because the default // deleter will call GRPC Server's base class's dtor and // that will cause a wired crash. @@ -98,10 +198,42 @@ class GenNCCLIdOp : public framework::OperatorBase { std::thread server_thread( std::bind(&distributed::RPCServer::StartServer, rpc_service.get())); - rpc_service->SetCond(distributed::kRequestSend); - VLOG(3) << "start getting nccl id from trainer 0..."; - rpc_service->WaitBarrier(distributed::kRequestSend); - VLOG(3) << "got nccl id and stop server..."; + for (int i = 0; i < nccl_comm_num; i++) { + rpc_service->SetCond(distributed::kRequestSend); + VLOG(3) << "trainer_id:" << trainer_id + << " start getting nccl id from trainer 0, nccl_comm_no:" << i; + rpc_service->WaitBarrier(distributed::kRequestSend); + rpc_service->ResetBarrierCounter(); + } + + if (use_hierarchical_allreduce) { + if (inter_trainer_id > 0) { + rpc_service->SetCond(distributed::kRequestSend); + VLOG(3) << "trainer_id:" << trainer_id + << ", inter_trainer_id:" << inter_trainer_id + << " start getting nccl id from inter_trainer 0"; + rpc_service->WaitBarrier(distributed::kRequestSend); + rpc_service->ResetBarrierCounter(); + } + + if (exter_trainer_id > 0) { + for (int i = 0; i < nccl_comm_num; i++) { + rpc_service->SetCond(distributed::kRequestSend); + VLOG(3) + << "trainer_id:" << trainer_id + << ", exter_trainer_id:" << exter_trainer_id + << " start getting nccl id from exter_trainer 0, nccl_comm_no:" + << i; + rpc_service->WaitBarrier(distributed::kRequestSend); + rpc_service->ResetBarrierCounter(); + } + } + } + + VLOG(3) << "traier_id:" << trainer_id + << ", inter_trainer_id:" << inter_trainer_id + << ", exter_trainer_id:" << exter_trainer_id + << " got nccl id and stop server..."; rpc_service->ShutDown(); VLOG(3) << "rpc server stopped"; server_thread.join(); @@ -118,18 +250,26 @@ GenNCCLId operator For trainer 0: generate a new UniqueId and send it to all the other trainers. For trainer 1~n: start a gRPC server to get the UniqueId, once got, stop the server. )DOC"); - AddAttr("endpoint", - "(string), e.g. 127.0.0.1:6175 " - "current listen endpoint"); AddAttr>( - "endpoint_list", - "['trainer1_ip:port', 'trainer2_ip:port', ...] " - "list of trainer endpoints start from trainer 1") + "trainers", + "['trainer0_ip:port', 'trainer1_ip:port', ...] " + "list of all trainer endpoints") .SetDefault({}); AddAttr("trainer_id", - "(int default 0) " - "The index of the trainer in distributed training.") - .SetDefault(0); + "(int) " + "The index of the trainer in distributed training."); + AddAttr("nccl_comm_num", + "(int default 1) " + "The number of nccl communicator num.") + .SetDefault(1); + AddAttr("use_hierarchical_allreduce", + "(bool default false) " + "Wheter to use hierarchical allreduce.") + .SetDefault(false); + AddAttr("hierarchical_allreduce_inter_nranks", + "(int default 1) " + "Wheter to use hierarchical allreduce.") + .SetDefault(-1); } }; diff --git a/paddle/fluid/platform/nccl_helper.h b/paddle/fluid/platform/nccl_helper.h index b8b14b3d1..9eb617281 100644 --- a/paddle/fluid/platform/nccl_helper.h +++ b/paddle/fluid/platform/nccl_helper.h @@ -124,8 +124,8 @@ struct NCCLContextMap { } else { rank = trainer_id; } - VLOG(3) << "init nccl rank: " << rank << " nranks: " << nranks - << " gpu id: " << gpu_id; + VLOG(1) << "init nccl rank:" << rank << ", nranks:" << nranks + << ", gpu_id:" << gpu_id << ", dev_id:" << order_[i]; PADDLE_ENFORCE(cudaSetDevice(gpu_id)); PADDLE_ENFORCE(platform::dynload::ncclCommInitRank( comms.get() + i, nranks, *nccl_id, rank)); @@ -160,6 +160,134 @@ struct NCCLContextMap { } }; +inline std::string GetFlatNCCLVarName(size_t pos) { + if (pos == 0) { + return NCCL_ID_VARNAME; + } + return string::Sprintf("%s_%d", NCCL_ID_VARNAME, static_cast(pos)); +} + +inline std::string GetHierarchicalExterNCCLVarName(size_t pos) { + return string::Sprintf("Hierarchical_exter_%s_%d", NCCL_ID_VARNAME, + static_cast(pos)); +} +inline std::string GetHierarchicalInterNCCLVarName() { + return string::Sprintf("Hierarchical_inter_%s", NCCL_ID_VARNAME); +} + +class MultiNCCLContextMap { + public: + MultiNCCLContextMap() {} + virtual ~MultiNCCLContextMap() {} + + NCCLContextMap *DefaultFlatCtx() const { + if (flat_ctxs_.size() == 0) { + return nullptr; + } + + return flat_ctxs_[0].get(); + } + + std::vector> *GetFlatCtxs() { + return &flat_ctxs_; + } + + NCCLContextMap *GetFlatCtx(size_t run_order) const { + return flat_ctxs_[run_order % flat_ctxs_.size()].get(); + } + + NCCLContextMap *GetRunEnvNCCLCtx(size_t run_order, + bool use_hierarchical_allreduce) const { + if (!use_hierarchical_allreduce) { + return GetFlatCtx(run_order); + } + + return GetHierarchicalInterCtx(run_order); + } + + void InitFlatCtxs(const std::vector &places, + const std::vector &nccl_ids, + size_t trainers_num, size_t trainer_id) { + if (nccl_ids.size() == 0) { + auto ptr = new platform::NCCLContextMap(places); + VLOG(1) << "init local trainer"; + flat_ctxs_.emplace_back(ptr); + return; + } + + for (size_t i = 0; i < nccl_ids.size(); i++) { + auto ptr = new platform::NCCLContextMap(places, nccl_ids[i], trainers_num, + trainer_id); + VLOG(1) << "init trainer_id:" << trainer_id << ", comm no:" << i; + flat_ctxs_.emplace_back(ptr); + } + } + + void InitHierarchicalCtxs(const std::vector &places, + ncclUniqueId *inter_nccl_id, + const std::vector &exter_nccl_id, + size_t trainers_num, size_t trainer_id, + size_t inter_trainers_num, + size_t exter_trainers_num) { + PADDLE_ENFORCE(trainers_num == inter_trainers_num * exter_trainers_num, + "trainers_num:%llu != inter_trainers_num:%llu * " + "exter_trainers_num:%llu", + trainers_num, inter_trainers_num, exter_trainers_num); + + PADDLE_ENFORCE(inter_trainers_num > 1, "inter_trainers_num:%llu must > 1", + inter_trainers_num); + + int inter_trainer_id = trainer_id % inter_trainers_num; + VLOG(1) << "init inter_trainer_id:" << inter_trainer_id; + auto local = new NCCLContextMap(places, inter_nccl_id, inter_trainers_num, + inter_trainer_id); + + h_inter_ctxs_.emplace_back(local); + + int exter_trainer_id = -1; + if (trainer_id % inter_trainers_num == 0) { + exter_trainer_id = trainer_id / inter_trainers_num; + } + + if (exter_trainer_id >= 0) { + for (size_t i = 0; i < exter_nccl_id.size(); i++) { + auto ex = new NCCLContextMap(places, exter_nccl_id[i], + exter_trainers_num, exter_trainer_id); + VLOG(1) << "init exter_trainer_id:" << exter_trainer_id + << ", comm no:" << i; + h_exter_ctxs_.emplace_back(ex); + } + } + } + + bool NeedExterAllReduce() const { return h_exter_ctxs_.size() > 0; } + + NCCLContextMap *GetHierarchicalInterCtx(size_t run_order) const { + return h_inter_ctxs_[run_order % h_inter_ctxs_.size()].get(); + } + + NCCLContextMap *GetHierarchicalExterCtx(size_t run_order) const { + return h_exter_ctxs_[run_order % h_exter_ctxs_.size()].get(); + } + + std::vector> *GetHierarchicalInterCtxs() { + return &h_inter_ctxs_; + } + + std::vector> *GetHierarchicalExterCtxs() { + return &h_exter_ctxs_; + } + + protected: + // Support multi nccl comm on default nccl ring while NCCLContextMap can't. + std::vector> flat_ctxs_; + + // h_inter_ctxs_ and h_exter_ctxs_ are for 2d allreduce. + // And h_exter_ctxs_ can support multi comm too. + std::vector> h_inter_ctxs_; + std::vector> h_exter_ctxs_; +}; + } // namespace platform } // namespace paddle #endif diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 859928e37..64c95a14e 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -1483,6 +1483,34 @@ All parameter, weight, gradient are variables in Paddle. [](BuildStrategy &self, int trainer_id) { self.trainer_id_ = trainer_id; }) + .def_property( + "nccl_comm_num", + [](const BuildStrategy &self) { return self.nccl_comm_num_; }, + [](BuildStrategy &self, int nccl_comm_num) { + self.nccl_comm_num_ = nccl_comm_num; + }) + .def_property("use_hierarchical_allreduce_", + [](const BuildStrategy &self) { + return self.use_hierarchical_allreduce_; + }, + [](BuildStrategy &self, bool use) { + self.use_hierarchical_allreduce_ = use; + }) + .def_property("hierarchical_allreduce_inter_nranks_", + [](const BuildStrategy &self) { + return self.hierarchical_allreduce_inter_nranks_; + }, + [](BuildStrategy &self, int nranks) { + self.hierarchical_allreduce_inter_nranks_ = nranks; + }) + .def_property("hierarchical_allreduce_exter_nranks_", + [](const BuildStrategy &self) { + return self.hierarchical_allreduce_exter_nranks_; + }, + [](BuildStrategy &self, int nranks) { + self.hierarchical_allreduce_exter_nranks_ = nranks; + }) + .def_property( "fuse_elewise_add_act_ops", [](const BuildStrategy &self) { diff --git a/python/paddle/fluid/compiler.py b/python/paddle/fluid/compiler.py index f01a6dd9d..09c3ecaf3 100644 --- a/python/paddle/fluid/compiler.py +++ b/python/paddle/fluid/compiler.py @@ -98,6 +98,7 @@ class CompiledProgram(object): def __init__(self, program_or_graph): if isinstance(program_or_graph, core.Graph): self._graph = program_or_graph + # don't not create a new program here. self._program = None elif isinstance(program_or_graph, framework.Program): self._graph = core.Graph(program_or_graph.desc) @@ -299,6 +300,7 @@ class CompiledProgram(object): # TODO(wuyi): trainer endpoings should be passed in through # build_strategy, not program.xxx. + # TODO(gongwb): let user to set them once. if self._program and self._build_strategy.num_trainers > 1 and \ self._program._trainers_endpoints: tps = self._program._trainers_endpoints @@ -307,6 +309,12 @@ class CompiledProgram(object): tps), "num_trainers == len(end_points)" self._build_strategy.trainers_endpoints = tps + if self._program: + self._build_strategy.nccl_comm_num = self._program._nccl_comm_num + self._build_strategy.use_hierarchical_allreduce_ = self._program._use_hierarchical_allreduce + self._build_strategy.hierarchical_allreduce_inter_nranks_ = self._program._hierarchical_allreduce_inter_nranks + self._build_strategy.hierarchical_allreduce_exter_nranks_ = self._program._hierarchical_allreduce_exter_nranks + if self._build_strategy.sync_batch_norm: self._build_strategy.enable_sequential_execution = True diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index 0991ea742..44c501ed1 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -2762,6 +2762,10 @@ class Program(object): # use Deep gradient comrepssion or not self._enable_dgc = False + self._nccl_comm_num = 1 + self._use_hierarchical_allreduce = False + self._hierarchical_allreduce_inter_nranks = 0 + self._hierarchical_allreduce_exter_nranks = 0 # @deprecated(the python memory optimize transpiler is deprecated) # whether the program is optimized by memory_optimize_transpiler diff --git a/python/paddle/fluid/tests/unittests/test_dist_base.py b/python/paddle/fluid/tests/unittests/test_dist_base.py index b479966d1..45192e405 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_base.py @@ -51,11 +51,14 @@ class TestDistRunnerBase(object): trainers, sync_mode, dc_asgd=False, - current_endpoint=None): + current_endpoint=None, + nccl_comm_num=1): # NOTE: import fluid until runtime, or else forking processes will cause error. config = fluid.DistributeTranspilerConfig() config.enable_dc_asgd = dc_asgd config.sync_mode = sync_mode + if nccl_comm_num > 1: + config.nccl_comm_num = nccl_comm_num # config.runtime_split_send_recv = True t = fluid.DistributeTranspiler(config=config) t.transpile( @@ -106,6 +109,7 @@ class TestDistRunnerBase(object): # transpile for nccl2 config = fluid.DistributeTranspilerConfig() config.mode = "nccl2" + config.nccl_comm_num = args.nccl_comm_num nccl2_t = fluid.DistributeTranspiler(config=config) nccl2_t.transpile( args.trainer_id, @@ -113,6 +117,7 @@ class TestDistRunnerBase(object): startup_program=fluid.default_startup_program(), trainers=args.endpoints, current_endpoint=args.current_endpoint) + trainer_prog = fluid.default_main_program() else: trainer_prog = fluid.default_main_program() @@ -268,6 +273,7 @@ def runtime_main(test_class): choices=["pserver", "nccl2", "local", "nccl2_reduce_layer"]) parser.add_argument('--trainer_id', type=int, required=False, default=0) parser.add_argument('--trainers', type=int, required=False, default=1) + parser.add_argument('--nccl_comm_num', type=int, required=False, default=1) parser.add_argument( '--current_endpoint', type=str, required=False, default="") parser.add_argument('--sync_mode', action='store_true') @@ -345,6 +351,7 @@ class TestDistBase(unittest.TestCase): self._lr = 0.001 self._use_dgc = False self._dygraph = False + self._nccl_comm_num = 1 self._setup_config() self._after_setup_config() @@ -590,6 +597,11 @@ class TestDistBase(unittest.TestCase): if self._use_dgc: tr0_cmd += " --use_dgc" tr1_cmd += " --use_dgc" + + if self._nccl_comm_num > 1: + tr0_cmd += " --nccl_comm_num {}".format(self._nccl_comm_num) + tr1_cmd += " --nccl_comm_num {}".format(self._nccl_comm_num) + if self._mp_mode: env0 = {"FLAGS_selected_gpus": "0"} env1 = {"FLAGS_selected_gpus": "1"} diff --git a/python/paddle/fluid/tests/unittests/test_dist_mnist.py b/python/paddle/fluid/tests/unittests/test_dist_mnist.py index b9d2f6db3..c6672d404 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_dist_mnist.py @@ -39,6 +39,20 @@ class TestDistMnistNCCL2(TestDistBase): self.check_with_place("dist_mnist.py", delta=1e-5) +class TestDistMnistNCCL2MultiNCCLComm(TestDistBase): + def _setup_config(self): + self._sync_mode = True + self._use_reduce = False + self._use_reader_alloc = False + self._nccl2_mode = True + self._nccl_comm_num = 3 + + def test_dist_train(self): + import paddle.fluid as fluid + if fluid.core.is_compiled_with_cuda(): + self.check_with_place("dist_mnist.py", delta=1e-5) + + class TestDistMnistNCCL2DGC(TestDistBase): def _setup_config(self): self._sync_mode = True diff --git a/python/paddle/fluid/transpiler/distribute_transpiler.py b/python/paddle/fluid/transpiler/distribute_transpiler.py index f6e7cce97..d7dde1f1c 100644 --- a/python/paddle/fluid/transpiler/distribute_transpiler.py +++ b/python/paddle/fluid/transpiler/distribute_transpiler.py @@ -165,6 +165,15 @@ class DistributeTranspilerConfig(object): runtime_split_send_recv = False sync_mode = True + nccl_comm_num = 1 + #The picture here illustrates the principle: + #https://github.com/PaddlePaddle/Paddle/pull/17263#discussion_r285411396 + use_hierarchical_allreduce = False + #Nccl ranks in a node when use hierarchical allreduce, it's setted to gpu cards' number in most cases. + hierarchical_allreduce_inter_nranks = 0 + #Nccl ranks bewteen nodes when use hierarchical allreduce, it's setted to nodes number. + hierarchical_allreduce_exter_nranks = 0 + class DistributeTranspiler(object): """ @@ -261,14 +270,36 @@ class DistributeTranspiler(object): nccl_id_var = startup_program.global_block().create_var( name="NCCLID", persistable=True, type=core.VarDesc.VarType.RAW) + + for i in range(1, self.config.nccl_comm_num): + startup_program.global_block().create_var( + name="NCCLID_{}".format(i), + persistable=True, + type=core.VarDesc.VarType.RAW) + + if self.config.use_hierarchical_allreduce: + startup_program.global_block().create_var( + name="Hierarchical_inter_NCCLID", + persistable=True, + type=core.VarDesc.VarType.RAW) + for i in range(0, self.config.nccl_comm_num): + startup_program.global_block().create_var( + name="Hierarchical_exter_NCCLID_{}".format(i), + persistable=True, + type=core.VarDesc.VarType.RAW) + startup_program.global_block().append_op( type="gen_nccl_id", inputs={}, outputs={"NCCLID": nccl_id_var}, attrs={ - "endpoint": current_endpoint, - "endpoint_list": worker_endpoints, - "trainer_id": trainer_id + "trainers": trainers.split(","), + "trainer_id": trainer_id, + "nccl_comm_num": self.config.nccl_comm_num, + "use_hierarchical_allreduce": + self.config.use_hierarchical_allreduce, + "hierarchical_allreduce_inter_nranks": + self.config.hierarchical_allreduce_inter_nranks }) return nccl_id_var else: @@ -350,6 +381,12 @@ class DistributeTranspiler(object): if self.config.mode == "nccl2": assert (isinstance(trainers, str)) self.origin_program._trainers_endpoints = trainers.split(",") + self.origin_program._nccl_comm_num = self.config.nccl_comm_num + self.origin_program._use_hierarchical_allreduce = self.config.use_hierarchical_allreduce + self.origin_program._hierarchical_allreduce_inter_nranks = \ + int(self.config.hierarchical_allreduce_inter_nranks) + self.origin_program._hierarchical_allreduce_exter_nranks = \ + int(self.config.hierarchical_allreduce_exter_nranks) self._transpile_nccl2( trainer_id, trainers, -- GitLab