From 01e2874a0e452f4f318a52955241811ae320bcfb Mon Sep 17 00:00:00 2001 From: ShenLiang Date: Tue, 22 Dec 2020 11:00:07 +0800 Subject: [PATCH] Support multi-stream communication for dynamic graph distributed (#29525) * fix fleet for multi-stream * fix memcpy for ncclid * use sync to solve move operation --- paddle/fluid/imperative/all_reduce.cc | 105 +++++++++--------- paddle/fluid/imperative/all_reduce.h | 9 +- paddle/fluid/imperative/nccl_context.cc | 64 +++++------ paddle/fluid/imperative/nccl_context.h | 10 +- paddle/fluid/imperative/reducer.cc | 91 +++++++++------ paddle/fluid/imperative/reducer.h | 12 +- .../imperative/tests/nccl_context_test.cc | 30 +++-- paddle/fluid/imperative/tests/test_group.cc | 4 +- paddle/fluid/pybind/imperative.cc | 8 +- .../distributed/fleet/base/fleet_base.py | 10 ++ python/paddle/distributed/parallel.py | 1 + python/paddle/fluid/dygraph/parallel.py | 30 ++++- 12 files changed, 235 insertions(+), 139 deletions(-) diff --git a/paddle/fluid/imperative/all_reduce.cc b/paddle/fluid/imperative/all_reduce.cc index 2c39ff6e86..8cebb35d4e 100644 --- a/paddle/fluid/imperative/all_reduce.cc +++ b/paddle/fluid/imperative/all_reduce.cc @@ -16,19 +16,27 @@ #include "paddle/fluid/imperative/all_reduce.h" -#include -#include - -#include "paddle/fluid/framework/scope.h" -#include "paddle/fluid/platform/device_context.h" -#include "paddle/fluid/platform/nccl_helper.h" -#include "paddle/fluid/string/string_helper.h" - namespace paddle { namespace imperative { +static const platform::Place &GetVarPlace(const framework::Variable &src) { + if (src.IsType()) { + return src.Get().place(); +#if NCCL_VERSION_CODE >= 2212 + } else if (src.IsType()) { + return src.Get().value().place(); +#endif + } else { + PADDLE_THROW(platform::errors::InvalidArgument( + "Cannot get unsupported variable type %s for imperative allreduce, " + "only " + "LoDTensor and SelectedRows are supported.", + platform::demangle(framework::ToTypeName(src.Type())))); + } +} static void AllReduce(const framework::Tensor &src, framework::Tensor *dst, - const ParallelStrategy &strategy, cudaStream_t stream) { + const cudaStream_t stream, + const platform::NCCLComm *comm) { const auto &place = src.place(); PADDLE_ENFORCE_EQ( platform::is_gpu_place(place), true, @@ -36,23 +44,20 @@ static void AllReduce(const framework::Tensor &src, framework::Tensor *dst, "Imperative mode does not support multi-CPU training yet.")); const void *src_ptr = src.data(); - dst->Resize(src.dims()); auto *dst_ptr = dst->mutable_data(src.place(), src.type()); - auto nccl_dtype = platform::ToNCCLDataType(src.type()); - auto comm = static_cast( - platform::DeviceContextPool::Instance().Get(place)) - ->nccl_comm(); - PADDLE_ENFORCE_CUDA_SUCCESS(platform::dynload::ncclAllReduce( - src_ptr, dst_ptr, src.numel(), nccl_dtype, ncclSum, comm, stream)); + src_ptr, dst_ptr, src.numel(), nccl_dtype, ncclSum, comm->comm(), + stream)); } #if NCCL_VERSION_CODE >= 2212 static void AllReduce(const framework::SelectedRows &src, framework::SelectedRows *dst, - const ParallelStrategy &strategy, cudaStream_t stream) { + const ParallelStrategy &strategy, + const cudaStream_t stream, + const platform::NCCLComm *comm) { VLOG(3) << "SelectedRows AllReduce start"; const auto &src_tensor = src.value(); const auto &place = src_tensor.place(); @@ -65,7 +70,8 @@ static void AllReduce(const framework::SelectedRows &src, auto nccl_dtype = platform::ToNCCLDataType(dtype); auto *dev_ctx = static_cast( platform::DeviceContextPool::Instance().Get(place)); - auto comm = dev_ctx->nccl_comm(); + + bool use_calc_stream = (dev_ctx->stream() == stream); // 1. Gather rows number from all workers. Here use ncclAllGather to do this, // but we can use other ways to implement is in the future @@ -74,12 +80,14 @@ static void AllReduce(const framework::SelectedRows &src, rows_num_vector[strategy.local_rank_] = static_cast(src_rows.size()); // CUDAMutableData use CalStream auto *gpu_rows_num_ptr = rows_num_vector.CUDAMutableData(place); - if (stream != dev_ctx->stream()) dev_ctx->Wait(); + if (!use_calc_stream) { + dev_ctx->Wait(); + } PADDLE_ENFORCE_CUDA_SUCCESS(platform::dynload::ncclAllGather( gpu_rows_num_ptr + strategy.local_rank_, gpu_rows_num_ptr, 1, ncclInt64, - comm, stream)); + comm->comm(), stream)); - if (stream != dev_ctx->stream()) { + if (!use_calc_stream) { PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamSynchronize(stream)); } @@ -108,19 +116,21 @@ static void AllReduce(const framework::SelectedRows &src, auto sizeof_dtype = framework::SizeOfType(dtype); int64_t row_offset = 0; - if (stream != dev_ctx->stream()) dev_ctx->Wait(); + if (!use_calc_stream) { + dev_ctx->Wait(); + } for (int i = 0; i < strategy.nranks_; ++i) { if (cpu_rows_num_ptr[i] > 0) { // 2. Broadcast the rows of SelectedRows PADDLE_ENFORCE_CUDA_SUCCESS(platform::dynload::ncclBroadcast( src_rows_ptr, dst_rows_ptr + row_offset, cpu_rows_num_ptr[i], - ncclInt64, i, comm, stream)); + ncclInt64, i, comm->comm(), stream)); // 3. Broadcast the tensor data of SelectedRows auto *dst_tensor_ptr_i = reinterpret_cast(dst_tensor_ptr) + row_offset * feature_size * sizeof_dtype; PADDLE_ENFORCE_CUDA_SUCCESS(platform::dynload::ncclBroadcast( src_tensor_ptr, dst_tensor_ptr_i, cpu_rows_num_ptr[i] * feature_size, - nccl_dtype, i, comm, stream)); + nccl_dtype, i, comm->comm(), stream)); row_offset += cpu_rows_num_ptr[i]; } } @@ -133,13 +143,21 @@ static void AllReduce(const framework::SelectedRows &src, #endif void AllReduce(const framework::Variable &src, framework::Variable *dst, - const ParallelStrategy &strategy, cudaStream_t stream) { + const ParallelStrategy &strategy, int ring_id, + bool use_calc_stream) { + const auto &place = GetVarPlace(src); + auto *dev_ctx = static_cast( + platform::DeviceContextPool::Instance().Get(place)); + platform::NCCLComm *comm = + platform::NCCLCommContext::Instance().Get(ring_id, place); + cudaStream_t stream = (use_calc_stream ? dev_ctx->stream() : comm->stream()); + if (src.IsType()) { if (!dst->IsType()) { dst->Clear(); } AllReduce(src.Get(), - dst->GetMutable(), strategy, stream); + dst->GetMutable(), stream, comm); #if NCCL_VERSION_CODE >= 2212 } else if (src.IsType()) { if (&src != dst) { @@ -147,13 +165,16 @@ void AllReduce(const framework::Variable &src, framework::Variable *dst, dst->Clear(); } AllReduce(src.Get(), - dst->GetMutable(), strategy, stream); + dst->GetMutable(), strategy, stream, + comm); } else { // SelectedRows cannot be allreduce in-place framework::Variable tmp_dst; AllReduce(src.Get(), - tmp_dst.GetMutable(), strategy, - stream); + tmp_dst.GetMutable(), strategy, stream, + comm); + // stream must synchronize to ensure accuracy of the move operation + PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamSynchronize(stream)); *dst = std::move(tmp_dst); } #endif @@ -165,33 +186,9 @@ void AllReduce(const framework::Variable &src, framework::Variable *dst, } } -static const platform::Place &GetVarPlace(const framework::Variable &src) { - if (src.IsType()) { - return src.Get().place(); -#if NCCL_VERSION_CODE >= 2212 - } else if (src.IsType()) { - return src.Get().value().place(); -#endif - } else { - PADDLE_THROW(platform::errors::InvalidArgument( - "Cannot get unsupported variable type %s for imperative allreduce, " - "only " - "LoDTensor and SelectedRows are supported.", - platform::demangle(framework::ToTypeName(src.Type())))); - } -} - void AllReduce(const framework::Variable &src, framework::Variable *dst, const ParallelStrategy &strategy) { - const auto &place = GetVarPlace(src); - PADDLE_ENFORCE_EQ( - platform::is_gpu_place(place), true, - platform::errors::Unimplemented( - "Imperative mode does not support multi-CPU training yet.")); - auto *dev_ctx = static_cast( - platform::DeviceContextPool::Instance().Get(place)); - auto stream = dev_ctx->stream(); - AllReduce(src, dst, strategy, stream); + AllReduce(src, dst, strategy, 0, true); } } // namespace imperative diff --git a/paddle/fluid/imperative/all_reduce.h b/paddle/fluid/imperative/all_reduce.h index bd94e78f46..7c6b77167b 100644 --- a/paddle/fluid/imperative/all_reduce.h +++ b/paddle/fluid/imperative/all_reduce.h @@ -19,11 +19,17 @@ #include #include #include +#include +#include #include "paddle/fluid/framework/lod_tensor.h" +#include "paddle/fluid/framework/scope.h" #include "paddle/fluid/framework/selected_rows.h" #include "paddle/fluid/framework/variable.h" #include "paddle/fluid/imperative/nccl_context.h" +#include "paddle/fluid/platform/device_context.h" +#include "paddle/fluid/platform/nccl_helper.h" +#include "paddle/fluid/string/string_helper.h" namespace paddle { namespace framework { @@ -40,7 +46,8 @@ void AllReduce(const framework::Variable &src, framework::Variable *dst, const ParallelStrategy &strategy); void AllReduce(const framework::Variable &src, framework::Variable *dst, - const ParallelStrategy &strategy, cudaStream_t stream); + const ParallelStrategy &strategy, int ring_id, + bool use_calc_stream); } // namespace imperative } // namespace paddle diff --git a/paddle/fluid/imperative/nccl_context.cc b/paddle/fluid/imperative/nccl_context.cc index e7c7b69370..7c9718e78a 100644 --- a/paddle/fluid/imperative/nccl_context.cc +++ b/paddle/fluid/imperative/nccl_context.cc @@ -17,8 +17,10 @@ namespace paddle { namespace imperative { #if defined(PADDLE_WITH_NCCL) -void NCCLParallelContext::RecvNCCLID(const std::string &ep, - ncclUniqueId *nccl_id) { +void NCCLParallelContext::RecvNCCLID( + const std::string &ep, + std::vector &nccl_ids) { // NOLINT + int nrings = nccl_ids.size(); auto addr = paddle::string::Split(ep, ':'); PADDLE_ENFORCE_EQ( addr.size(), 2UL, @@ -85,14 +87,16 @@ void NCCLParallelContext::RecvNCCLID(const std::string &ep, } VLOG(3) << "recevived the ncclUniqueId"; - memcpy(nccl_id, buffer, NCCL_UNIQUE_ID_BYTES); + + memcpy(&nccl_ids[0], buffer, nrings * NCCL_UNIQUE_ID_BYTES); VLOG(3) << "closing the socket server: " << ep; close(server_fd); } -void NCCLParallelContext::SendNCCLID(const std::string &ep, - ncclUniqueId *nccl_id) { +void NCCLParallelContext::SendNCCLID( + const std::string &ep, const std::vector &nccl_ids) { + int nrings = nccl_ids.size(); auto addr = paddle::string::Split(ep, ':'); PADDLE_ENFORCE_EQ( addr.size(), 2UL, @@ -100,12 +104,12 @@ void NCCLParallelContext::SendNCCLID(const std::string &ep, "The endpoint should contain host and port, but got %s.", ep)); std::string host = addr[0]; int port = std::stoi(addr[1]); - // struct sockaddr_in address; int sock = 0; struct sockaddr_in serv_addr; char buffer[1024] = {0}; - memcpy(buffer, nccl_id, NCCL_UNIQUE_ID_BYTES); + memcpy(buffer, &nccl_ids[0], nrings * NCCL_UNIQUE_ID_BYTES); + if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) { PADDLE_THROW(platform::errors::Unavailable("Create socket failed.")); } @@ -149,40 +153,46 @@ void NCCLParallelContext::SendNCCLID(const std::string &ep, continue; } VLOG(3) << "sending the ncclUniqueId to " << ep; - send(sock, buffer, NCCL_UNIQUE_ID_BYTES, 0); + send(sock, buffer, NCCL_UNIQUE_ID_BYTES * nrings, 0); break; } close(sock); } -void NCCLParallelContext::BcastNCCLId(ncclUniqueId *nccl_id, int root) { +void NCCLParallelContext::BcastNCCLId( + std::vector &nccl_ids, // NOLINT + int root) { if (strategy_.local_rank_ == root) { for (auto ep : strategy_.trainer_endpoints_) { - if (ep != strategy_.current_endpoint_) SendNCCLID(ep, nccl_id); + if (ep != strategy_.current_endpoint_) SendNCCLID(ep, nccl_ids); } } else { - RecvNCCLID(strategy_.current_endpoint_, nccl_id); + RecvNCCLID(strategy_.current_endpoint_, nccl_ids); } } void NCCLParallelContext::Init() { - for (int ring_id = 0; ring_id < strategy_.nrings_; ring_id++) { - ncclUniqueId nccl_id; - if (strategy_.local_rank_ == 0) { - // generate the unique ncclid on the root worker - platform::dynload::ncclGetUniqueId(&nccl_id); - BcastNCCLId(&nccl_id, 0); - } else { - BcastNCCLId(&nccl_id, 0); + std::vector nccl_ids; + nccl_ids.resize(strategy_.nrings_); + if (strategy_.local_rank_ == 0) { + // generate the unique ncclid on the root worker + for (size_t i = 0; i < nccl_ids.size(); ++i) { + platform::dynload::ncclGetUniqueId(&nccl_ids[i]); } - int gpu_id = BOOST_GET_CONST(platform::CUDAPlace, place_).device; + BcastNCCLId(nccl_ids, 0); + } else { + BcastNCCLId(nccl_ids, 0); + } + + int gpu_id = BOOST_GET_CONST(platform::CUDAPlace, place_).device; + for (int ring_id = 0; ring_id < strategy_.nrings_; ring_id++) { VLOG(0) << "init nccl context nranks: " << strategy_.nranks_ << " local rank: " << strategy_.local_rank_ << " gpu id: " << gpu_id << " ring id: " << ring_id; - // it will assign nccl_comm in CUDADeviceContext within ring_id platform::NCCLCommContext::Instance().CreateNCCLComm( - &nccl_id, strategy_.nranks_, strategy_.local_rank_, gpu_id, ring_id); + &nccl_ids[ring_id], strategy_.nranks_, strategy_.local_rank_, gpu_id, + ring_id); } } @@ -193,15 +203,7 @@ void NCCLParallelContext::AllReduceByStream(const framework::Variable &src, platform::is_gpu_place(place_), true, platform::errors::Unimplemented( "Dynamic graph mode does not support multi-CPU training yet.")); - auto comm = platform::NCCLCommContext::Instance().Get(ring_id, place_); - cudaStream_t stream = nullptr; - if (use_calc_stream) { - auto dev_ctx = platform::DeviceContextPool::Instance().Get(place_); - stream = static_cast(dev_ctx)->stream(); - } else { - stream = comm->stream(); - } - AllReduce(src, dst, strategy_, stream); + AllReduce(src, dst, strategy_, ring_id, use_calc_stream); } paddle::platform::CUDADeviceContext *NCCLParallelContext::GetDeviceContext( diff --git a/paddle/fluid/imperative/nccl_context.h b/paddle/fluid/imperative/nccl_context.h index ebb1b17643..b0e857a8df 100644 --- a/paddle/fluid/imperative/nccl_context.h +++ b/paddle/fluid/imperative/nccl_context.h @@ -73,6 +73,8 @@ class ParallelContext { int ring_id) = 0; #endif + inline int GetNRings() { return strategy_.nrings_; } + protected: ParallelStrategy strategy_; platform::Place place_; @@ -87,7 +89,7 @@ class NCCLParallelContext : public ParallelContext { ~NCCLParallelContext() {} - void BcastNCCLId(ncclUniqueId* nccl_id, int root); + void BcastNCCLId(std::vector& nccl_ids, int root); // NOLINT void Init() override; @@ -98,9 +100,11 @@ class NCCLParallelContext : public ParallelContext { paddle::platform::CUDADeviceContext* GetDeviceContext(int ring_id) override; protected: - void RecvNCCLID(const std::string& endpoint, ncclUniqueId* nccl_id); + void RecvNCCLID(const std::string& endpoint, + std::vector& nccl_ids); // NOLINT - void SendNCCLID(const std::string& endpoint, ncclUniqueId* nccl_id); + void SendNCCLID(const std::string& endpoint, + const std::vector& nccl_ids); }; #endif diff --git a/paddle/fluid/imperative/reducer.cc b/paddle/fluid/imperative/reducer.cc index 54a2b647d4..85f2831a06 100644 --- a/paddle/fluid/imperative/reducer.cc +++ b/paddle/fluid/imperative/reducer.cc @@ -68,7 +68,7 @@ void Group::SplitTensors(const platform::CUDADeviceContext &context) { std::ostream &operator<<(std::ostream &out, const Group &group) { const auto &vars = group.variable_indices_; - out << "numul: " << group.all_length_ << " ;is_sparse: " << group.is_sparse_ + out << "numel: " << group.all_length_ << " ;is_sparse: " << group.is_sparse_ << " ;var number: " << vars.size() << "\n"; auto begin = vars.begin(); auto end = vars.end(); @@ -95,6 +95,7 @@ Reducer::Reducer(const std::vector> &vars, parallel_ctx_(parallel_ctx), group_size_limits_(group_size_limits) { VLOG(3) << "Start construct the Reducer ..."; + nrings_ = parallel_ctx->GetNRings(); // initialize groups InitializeGroups(group_indices); for (size_t global_var_index = 0; global_var_index < vars_.size(); @@ -109,11 +110,13 @@ Reducer::Reducer(const std::vector> &vars, compute_stream_ = static_cast( platform::DeviceContextPool::Instance().Get(place_)) ->stream(); - comm_stream_ = platform::NCCLCommContext::Instance().Get(0, place_)->stream(); - // create events + for (int i = 0; i < nrings_; ++i) { + comm_streams_.emplace_back( + platform::NCCLCommContext::Instance().Get(i, place_)->stream()); + comm_events_.emplace_back(platform::CudaEventResourcePool::Instance().New( + BOOST_GET_CONST(platform::CUDAPlace, place_).device)); + } CreateGroupEvents(group_indices.size()); - comm_enent_ = platform::CudaEventResourcePool::Instance().New( - BOOST_GET_CONST(platform::CUDAPlace, place_).device); std::call_once(once_flag_, []() { std::atexit([]() { Reducer::GetInstance()->ReleaseReducer(); }); @@ -121,20 +124,22 @@ Reducer::Reducer(const std::vector> &vars, } void Reducer::ReleaseReducer() { - for (auto &event : events_) { + for (auto &event : group_events_) { + event.reset(); + } + for (auto &event : comm_events_) { event.reset(); } - comm_enent_.reset(); } void Reducer::CreateGroupEvents(int group_num) { // release old events - for (auto &event : events_) { + for (auto &event : group_events_) { event.reset(); } - events_.clear(); - events_.resize(group_num); - for (auto &event : events_) { + group_events_.clear(); + group_events_.resize(group_num); + for (auto &event : group_events_) { event = platform::CudaEventResourcePool::Instance().New( BOOST_GET_CONST(platform::CUDAPlace, place_).device); } @@ -194,7 +199,7 @@ void Reducer::InitializeDenseGroups( // Each parameter will be initialized according to the group information. // For the sparse parameter, sparse_contents_ in the group directly points // to the parameter. For dense parameters, first construct an empty Tensor(). -// Then specify the actual memory in MarkVariableReady. +// Then specify the actual memory in MarkDenseVarReady. void Reducer::InitializeGroups( const std::vector> &group_indices) { VLOG(3) << "Start initialize groups .."; @@ -218,7 +223,6 @@ void Reducer::InitializeGroups( if (variable_indices_.size() == 1 && is_sparse_gradient_[variable_indices_.front()]) { // process the sparse gradient. one sparse, one group - group.sparse_contents_ = first_varbase->MutableGradVar(); group.dtype_ = first_varbase->DataType(); group.is_sparse_ = true; } else { @@ -232,7 +236,7 @@ void Reducer::InitializeGroups( // map variables to this group by VariableLocator size_t inside_group_index = 0; - for (const auto var_index : group_indices[group_index]) { + for (const auto var_index : variable_indices_) { variable_locators_[var_index] = VariableLocator{ .group_index = group_index, .inside_group_index = inside_group_index++, @@ -260,7 +264,7 @@ void Reducer::PrepareForBackward() { // Add hook function to each leaf node. When the gradient of a leaf node is // generated, if it is the sparse parameter, it will directly execute allreduce, // if it is the dense parameter, it will execute three steps: 1, -// MarkVariableReady. Find the position of the corresponding group +// MarkDenseVarReady. Find the position of the corresponding group // through var_index, share the gradient memory and the group dense_tensors, // the group counter is reduced by 1. 2, MarkGroupReady: When the group // counter is 0, it means that allreduce can be emitted, and @@ -278,8 +282,11 @@ void Reducer::AddDistHook(VariableWrapper *var_warpper, size_t var_index) { if (!group.is_sparse_) { // Only dense_contents_ need memory copy - MarkVariableReady(var_index, var_warpper); + MarkDenseVarReady(var_index, var_warpper); + } else { + MarkSparseVarReady(var_index, var_warpper); } + if (--group.pending_ == 0) { // can start allreduce MarkGroupReady(group_index); @@ -290,7 +297,7 @@ void Reducer::AddDistHook(VariableWrapper *var_warpper, size_t var_index) { } } -void Reducer::MarkVariableReady(size_t var_index, +void Reducer::MarkDenseVarReady(size_t var_index, VariableWrapper *var_warpper) { const auto &var_locator = variable_locators_[var_index]; auto group_index = var_locator.group_index; @@ -303,6 +310,14 @@ void Reducer::MarkVariableReady(size_t var_index, {static_cast(length)}); } +void Reducer::MarkSparseVarReady(size_t var_index, + VariableWrapper *var_warpper) { + const auto &var_locator = variable_locators_[var_index]; + auto group_index = var_locator.group_index; + auto &group = groups_[group_index]; + group.sparse_contents_ = var_warpper->MutableVar(); +} + void Reducer::MarkGroupReady(size_t group_index) { if (group_index > next_group_) { VLOG(3) << "It will adjust the order of group in next batch automatically"; @@ -310,29 +325,35 @@ void Reducer::MarkGroupReady(size_t group_index) { } PADDLE_ENFORCE_CUDA_SUCCESS( - cudaEventRecord(events_[group_index].get(), compute_stream_)); - PADDLE_ENFORCE_CUDA_SUCCESS( - cudaStreamWaitEvent(comm_stream_, events_[group_index].get(), 0)); + cudaEventRecord(group_events_[group_index].get(), compute_stream_)); + for (int i = 0; i < nrings_; ++i) { + PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamWaitEvent( + comm_streams_[i], group_events_[group_index].get(), 0)); + } for (; next_group_ < groups_.size() && groups_[next_group_].pending_ == 0; ++next_group_) { auto &group = groups_[next_group_]; + int run_order = next_group_ % nrings_; if (group.is_sparse_) { - VLOG(3) << "sparse group [" << next_group_ << "] start allreduce..."; - parallel_ctx_->AllReduceByStream(*group.sparse_contents_, - group.sparse_contents_, 0, false); + VLOG(3) << "sparse group [" << next_group_ << "] start allreduce in ring[" + << run_order << "]"; + parallel_ctx_->AllReduceByStream( + *group.sparse_contents_, group.sparse_contents_, run_order, false); } else { - VLOG(3) << "dense group [" << next_group_ << "] start allreduce..."; + VLOG(3) << "dense group [" << next_group_ << "] start allreduce in ring[" + << run_order << "]"; // Select common commstream to concat tensors // group.dense_tensors ---> group.dense_contents_ - group.ConcatTensors(*parallel_ctx_->GetDeviceContext(0)); + group.ConcatTensors(*parallel_ctx_->GetDeviceContext(run_order)); // Start allreduce - parallel_ctx_->AllReduceByStream(group.dense_contents_, - &(group.dense_contents_), 0, false); + parallel_ctx_->AllReduceByStream( + group.dense_contents_, &(group.dense_contents_), run_order, false); + // Select common commstream to split tensors // group.dense_contents_ ---> group.dense_tensors - group.SplitTensors(*parallel_ctx_->GetDeviceContext(0)); + group.SplitTensors(*parallel_ctx_->GetDeviceContext(run_order)); } } } @@ -351,9 +372,16 @@ std::vector> Reducer::RebuildGruops() { } void Reducer::FinalizeBackward() { - PADDLE_ENFORCE_CUDA_SUCCESS(cudaEventRecord(comm_enent_.get(), comm_stream_)); - PADDLE_ENFORCE_CUDA_SUCCESS( - cudaStreamWaitEvent(compute_stream_, comm_enent_.get(), 0)); + // Must prevent compute_stream_ starting until all comm streams have finished + for (int i = 0; i < nrings_; ++i) { + PADDLE_ENFORCE_CUDA_SUCCESS( + cudaEventRecord(comm_events_[i].get(), comm_streams_[i])); + } + for (int i = 0; i < nrings_; ++i) { + PADDLE_ENFORCE_CUDA_SUCCESS( + cudaStreamWaitEvent(compute_stream_, comm_events_[i].get(), 0)); + } + if (!has_rebuilt_group_) { VLOG(3) << "Start rebuilding the groups"; auto rebuild_group_indices = RebuildGruops(); @@ -362,6 +390,7 @@ void Reducer::FinalizeBackward() { CreateGroupEvents(rebuild_group_number); InitializeGroups(group_indices_); } + VLOG(3) << "In the batch, Reducer is finished..."; } diff --git a/paddle/fluid/imperative/reducer.h b/paddle/fluid/imperative/reducer.h index 3e65685d5c..2bfc308de0 100644 --- a/paddle/fluid/imperative/reducer.h +++ b/paddle/fluid/imperative/reducer.h @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -133,7 +134,9 @@ class Reducer { void AddDistHook(VariableWrapper* var_warpper, size_t var_index); - void MarkVariableReady(size_t var_index, VariableWrapper* var_warpper); + void MarkDenseVarReady(size_t var_index, VariableWrapper* var_warpper); + + void MarkSparseVarReady(size_t var_index, VariableWrapper* var_warpper); void MarkGroupReady(size_t group_index); @@ -180,10 +183,11 @@ class Reducer { std::vector variable_locators_; // Following variables are to help sync stream - std::vector> events_; - std::shared_ptr comm_enent_; + std::vector> group_events_; + std::vector> comm_events_; cudaStream_t compute_stream_; - cudaStream_t comm_stream_; + std::vector comm_streams_; + int nrings_ = 1; // Following variables are to help rebuild group bool has_rebuilt_group_{false}; diff --git a/paddle/fluid/imperative/tests/nccl_context_test.cc b/paddle/fluid/imperative/tests/nccl_context_test.cc index e0d6950a97..649746a5bd 100644 --- a/paddle/fluid/imperative/tests/nccl_context_test.cc +++ b/paddle/fluid/imperative/tests/nccl_context_test.cc @@ -19,6 +19,7 @@ namespace imperative = paddle::imperative; namespace platform = paddle::platform; +int nrings = 2; imperative::ParallelStrategy GetStrategy(int local_rank) { std::vector eps = {"127.0.0.1:9866", "localhost:9867"}; imperative::ParallelStrategy strategy; @@ -26,27 +27,38 @@ imperative::ParallelStrategy GetStrategy(int local_rank) { strategy.current_endpoint_ = eps[local_rank]; strategy.nranks_ = 2; strategy.local_rank_ = local_rank; + strategy.nrings_ = nrings; return strategy; } #if defined(PADDLE_WITH_NCCL) -void BcastNCCLId(int local_rank, ncclUniqueId *nccl_id) { +void BcastNCCLId(int local_rank, std::vector* nccl_ids) { auto strategy = GetStrategy(local_rank); platform::CUDAPlace gpu(local_rank); imperative::NCCLParallelContext ctx(strategy, gpu); - ctx.BcastNCCLId(nccl_id, 0); + ctx.BcastNCCLId(*nccl_ids, 0); } TEST(BcastNCCLId, Run) { - ncclUniqueId nccl_id; - platform::dynload::ncclGetUniqueId(&nccl_id); - std::thread t(BcastNCCLId, 0, &nccl_id); + std::vector nccl_ids; + nccl_ids.resize(nrings); + for (int i = 0; i < nrings; ++i) { + platform::dynload::ncclGetUniqueId(&nccl_ids[i]); + } - ncclUniqueId recv_nccl_id; - BcastNCCLId(1, &recv_nccl_id); + std::thread t(BcastNCCLId, 0, &nccl_ids); + + std::vector recv_nccl_ids; + recv_nccl_ids.resize(nrings); + for (int i = 0; i < nrings; ++i) { + platform::dynload::ncclGetUniqueId(&recv_nccl_ids[i]); + } + BcastNCCLId(1, &recv_nccl_ids); t.join(); - EXPECT_EQ(0, std::memcmp(nccl_id.internal, recv_nccl_id.internal, - NCCL_UNIQUE_ID_BYTES)); + for (int i = 0; i < nrings; ++i) { + EXPECT_EQ(0, std::memcmp(nccl_ids[i].internal, recv_nccl_ids[i].internal, + NCCL_UNIQUE_ID_BYTES)); + } } #endif diff --git a/paddle/fluid/imperative/tests/test_group.cc b/paddle/fluid/imperative/tests/test_group.cc index 2e967d296d..243f78704e 100644 --- a/paddle/fluid/imperative/tests/test_group.cc +++ b/paddle/fluid/imperative/tests/test_group.cc @@ -33,7 +33,7 @@ TEST(TestGroup, TestPrintGroupMessage) { std::stringstream stream1, stream2; stream1 << group; ASSERT_STREQ(stream1.str().c_str(), - "numul: 0 ;is_sparse: 0 ;var number: 0\n[]\n"); + "numel: 0 ;is_sparse: 0 ;var number: 0\n[]\n"); std::vector vars; size_t vars_num = 102; @@ -44,7 +44,7 @@ TEST(TestGroup, TestPrintGroupMessage) { group.all_length_ = 102; group.is_sparse_ = false; - std::string head = "numul: 102 ;is_sparse: 0 ;var number: 102\n"; + std::string head = "numel: 102 ;is_sparse: 0 ;var number: 102\n"; head = head + "["; auto begin = vars.begin(); auto end = vars.end(); diff --git a/paddle/fluid/pybind/imperative.cc b/paddle/fluid/pybind/imperative.cc index ec59eacef1..08af2f023c 100644 --- a/paddle/fluid/pybind/imperative.cc +++ b/paddle/fluid/pybind/imperative.cc @@ -1261,7 +1261,13 @@ void BindImperative(py::module *m_ptr) { return self.current_endpoint_; }, [](imperative::ParallelStrategy &self, - const std::string &ep) { self.current_endpoint_ = ep; }); + const std::string &ep) { self.current_endpoint_ = ep; }) + .def_property( + "nrings", + [](const imperative::ParallelStrategy &self) { return self.nrings_; }, + [](imperative::ParallelStrategy &self, int nrings) { + self.nrings_ = nrings; + }); m.def( "dygraph_partial_grad", diff --git a/python/paddle/distributed/fleet/base/fleet_base.py b/python/paddle/distributed/fleet/base/fleet_base.py index 1a4b79e6ae..cd6238c112 100644 --- a/python/paddle/distributed/fleet/base/fleet_base.py +++ b/python/paddle/distributed/fleet/base/fleet_base.py @@ -16,6 +16,7 @@ from __future__ import print_function import copy import warnings import paddle +import os from paddle.fluid.framework import dygraph_only from paddle.fluid import compiler from .role_maker import UserDefinedRoleMaker, PaddleCloudRoleMaker, RoleMakerBase @@ -221,6 +222,15 @@ class Fleet(object): warnings.warn( "The dygraph parallel environment has been initialized.") else: + # FLAGS_nccl_nrings is used for dynamic graph multi-stream communication + if "FLAGS_nccl_nrings" in os.environ: + warnings.warn( + "You have set the environment variable FLAGS_nccl_nrings " + "outside the program, so the nccl_comm_num in " + "DistributedStrategy will not take effect here.") + else: + os.environ["FLAGS_nccl_nrings"] = str( + self._user_defined_strategy.nccl_comm_num) paddle.distributed.init_parallel_env() def is_first_worker(self): diff --git a/python/paddle/distributed/parallel.py b/python/paddle/distributed/parallel.py index ed016fdc17..be66e13aa1 100644 --- a/python/paddle/distributed/parallel.py +++ b/python/paddle/distributed/parallel.py @@ -166,6 +166,7 @@ def init_parallel_env(): strategy.local_rank = parallel_env.rank strategy.trainer_endpoints = parallel_env.trainer_endpoints strategy.current_endpoint = parallel_env.current_endpoint + strategy.nrings = parallel_env.nrings # NOTE(chenweihang): [ why config global place here? ] # the dygraph mode will be set to default mode, diff --git a/python/paddle/fluid/dygraph/parallel.py b/python/paddle/fluid/dygraph/parallel.py index 731a9f809d..a9ed2f9f52 100644 --- a/python/paddle/fluid/dygraph/parallel.py +++ b/python/paddle/fluid/dygraph/parallel.py @@ -114,6 +114,11 @@ class ParallelEnv(object): self._trainer_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS", "").split(",") self._current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT", "") + self._nrings = int(os.getenv("FLAGS_nccl_nrings", "1")) + assert self._nrings > 0, \ + "nccl_nrings must be an integer greater than 0." + assert self._nrings < 9, \ + "nccl_nrings should be less than 9, which is enough in most scenarios." @property def rank(self): @@ -211,6 +216,25 @@ class ParallelEnv(object): """ return self._trainer_endpoints + @property + def nrings(self): + """ + Nrings of current trainer. + + Its value is equal to the value of the environment variable ``FLAGS_nccl_nrings`` . The default value is 1. + + Examples: + .. code-block:: python + + # execute this command in terminal: export FLAGS_nccl_nrings=1 + import paddle.distributed as dist + + env = dist.ParallelEnv() + print("The nrings is %d" % env.nrings) + # the number of ring is 1 + """ + return self._nrings + # [aliases] Compatible with old method names local_rank = rank nranks = world_size @@ -397,8 +421,8 @@ class DataParallel(layers.Layer): else: warnings.warn("The program will return to single-card operation. " "Please check 1, whether you use spawn or fleetrun " - "to start the program. 2. Whether it is a multi-card " - "program. 3. Is the current environment multi-card.") + "to start the program. 2, Whether it is a multi-card " + "program. 3, Is the current environment multi-card.") def init_reducer(self): layers_param = [] @@ -424,7 +448,7 @@ class DataParallel(layers.Layer): if isinstance(sublayer, paddle.nn.layer.common.Embedding): return sublayer._sparse # NOTE(shenliang03):This is for compatibility. If paddle.fluid.dygraph.Embedding - # is removed in the future, the judgment will also be removed here. + # is removed in the future, the check will also be removed here. if isinstance(sublayer, paddle.fluid.dygraph.Embedding): return sublayer._is_sparse return False -- GitLab