未验证 提交 01e2874a 编写于 作者: S ShenLiang 提交者: GitHub

Support multi-stream communication for dynamic graph distributed (#29525)

* fix fleet for multi-stream

* fix memcpy for ncclid

* use sync to solve move operation
上级 f350aa59
......@@ -16,19 +16,27 @@
#include "paddle/fluid/imperative/all_reduce.h"
#include <string>
#include <utility>
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/nccl_helper.h"
#include "paddle/fluid/string/string_helper.h"
namespace paddle {
namespace imperative {
static const platform::Place &GetVarPlace(const framework::Variable &src) {
if (src.IsType<framework::LoDTensor>()) {
return src.Get<framework::LoDTensor>().place();
#if NCCL_VERSION_CODE >= 2212
} else if (src.IsType<framework::SelectedRows>()) {
return src.Get<framework::SelectedRows>().value().place();
#endif
} else {
PADDLE_THROW(platform::errors::InvalidArgument(
"Cannot get unsupported variable type %s for imperative allreduce, "
"only "
"LoDTensor and SelectedRows are supported.",
platform::demangle(framework::ToTypeName(src.Type()))));
}
}
static void AllReduce(const framework::Tensor &src, framework::Tensor *dst,
const ParallelStrategy &strategy, cudaStream_t stream) {
const cudaStream_t stream,
const platform::NCCLComm *comm) {
const auto &place = src.place();
PADDLE_ENFORCE_EQ(
platform::is_gpu_place(place), true,
......@@ -36,23 +44,20 @@ static void AllReduce(const framework::Tensor &src, framework::Tensor *dst,
"Imperative mode does not support multi-CPU training yet."));
const void *src_ptr = src.data<void>();
dst->Resize(src.dims());
auto *dst_ptr = dst->mutable_data(src.place(), src.type());
auto nccl_dtype = platform::ToNCCLDataType(src.type());
auto comm = static_cast<platform::CUDADeviceContext *>(
platform::DeviceContextPool::Instance().Get(place))
->nccl_comm();
PADDLE_ENFORCE_CUDA_SUCCESS(platform::dynload::ncclAllReduce(
src_ptr, dst_ptr, src.numel(), nccl_dtype, ncclSum, comm, stream));
src_ptr, dst_ptr, src.numel(), nccl_dtype, ncclSum, comm->comm(),
stream));
}
#if NCCL_VERSION_CODE >= 2212
static void AllReduce(const framework::SelectedRows &src,
framework::SelectedRows *dst,
const ParallelStrategy &strategy, cudaStream_t stream) {
const ParallelStrategy &strategy,
const cudaStream_t stream,
const platform::NCCLComm *comm) {
VLOG(3) << "SelectedRows AllReduce start";
const auto &src_tensor = src.value();
const auto &place = src_tensor.place();
......@@ -65,7 +70,8 @@ static void AllReduce(const framework::SelectedRows &src,
auto nccl_dtype = platform::ToNCCLDataType(dtype);
auto *dev_ctx = static_cast<platform::CUDADeviceContext *>(
platform::DeviceContextPool::Instance().Get(place));
auto comm = dev_ctx->nccl_comm();
bool use_calc_stream = (dev_ctx->stream() == stream);
// 1. Gather rows number from all workers. Here use ncclAllGather to do this,
// but we can use other ways to implement is in the future
......@@ -74,12 +80,14 @@ static void AllReduce(const framework::SelectedRows &src,
rows_num_vector[strategy.local_rank_] = static_cast<int64_t>(src_rows.size());
// CUDAMutableData use CalStream
auto *gpu_rows_num_ptr = rows_num_vector.CUDAMutableData(place);
if (stream != dev_ctx->stream()) dev_ctx->Wait();
if (!use_calc_stream) {
dev_ctx->Wait();
}
PADDLE_ENFORCE_CUDA_SUCCESS(platform::dynload::ncclAllGather(
gpu_rows_num_ptr + strategy.local_rank_, gpu_rows_num_ptr, 1, ncclInt64,
comm, stream));
comm->comm(), stream));
if (stream != dev_ctx->stream()) {
if (!use_calc_stream) {
PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamSynchronize(stream));
}
......@@ -108,19 +116,21 @@ static void AllReduce(const framework::SelectedRows &src,
auto sizeof_dtype = framework::SizeOfType(dtype);
int64_t row_offset = 0;
if (stream != dev_ctx->stream()) dev_ctx->Wait();
if (!use_calc_stream) {
dev_ctx->Wait();
}
for (int i = 0; i < strategy.nranks_; ++i) {
if (cpu_rows_num_ptr[i] > 0) {
// 2. Broadcast the rows of SelectedRows
PADDLE_ENFORCE_CUDA_SUCCESS(platform::dynload::ncclBroadcast(
src_rows_ptr, dst_rows_ptr + row_offset, cpu_rows_num_ptr[i],
ncclInt64, i, comm, stream));
ncclInt64, i, comm->comm(), stream));
// 3. Broadcast the tensor data of SelectedRows
auto *dst_tensor_ptr_i = reinterpret_cast<uint8_t *>(dst_tensor_ptr) +
row_offset * feature_size * sizeof_dtype;
PADDLE_ENFORCE_CUDA_SUCCESS(platform::dynload::ncclBroadcast(
src_tensor_ptr, dst_tensor_ptr_i, cpu_rows_num_ptr[i] * feature_size,
nccl_dtype, i, comm, stream));
nccl_dtype, i, comm->comm(), stream));
row_offset += cpu_rows_num_ptr[i];
}
}
......@@ -133,13 +143,21 @@ static void AllReduce(const framework::SelectedRows &src,
#endif
void AllReduce(const framework::Variable &src, framework::Variable *dst,
const ParallelStrategy &strategy, cudaStream_t stream) {
const ParallelStrategy &strategy, int ring_id,
bool use_calc_stream) {
const auto &place = GetVarPlace(src);
auto *dev_ctx = static_cast<platform::CUDADeviceContext *>(
platform::DeviceContextPool::Instance().Get(place));
platform::NCCLComm *comm =
platform::NCCLCommContext::Instance().Get(ring_id, place);
cudaStream_t stream = (use_calc_stream ? dev_ctx->stream() : comm->stream());
if (src.IsType<framework::LoDTensor>()) {
if (!dst->IsType<framework::LoDTensor>()) {
dst->Clear();
}
AllReduce(src.Get<framework::LoDTensor>(),
dst->GetMutable<framework::LoDTensor>(), strategy, stream);
dst->GetMutable<framework::LoDTensor>(), stream, comm);
#if NCCL_VERSION_CODE >= 2212
} else if (src.IsType<framework::SelectedRows>()) {
if (&src != dst) {
......@@ -147,13 +165,16 @@ void AllReduce(const framework::Variable &src, framework::Variable *dst,
dst->Clear();
}
AllReduce(src.Get<framework::SelectedRows>(),
dst->GetMutable<framework::SelectedRows>(), strategy, stream);
dst->GetMutable<framework::SelectedRows>(), strategy, stream,
comm);
} else {
// SelectedRows cannot be allreduce in-place
framework::Variable tmp_dst;
AllReduce(src.Get<framework::SelectedRows>(),
tmp_dst.GetMutable<framework::SelectedRows>(), strategy,
stream);
tmp_dst.GetMutable<framework::SelectedRows>(), strategy, stream,
comm);
// stream must synchronize to ensure accuracy of the move operation
PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamSynchronize(stream));
*dst = std::move(tmp_dst);
}
#endif
......@@ -165,33 +186,9 @@ void AllReduce(const framework::Variable &src, framework::Variable *dst,
}
}
static const platform::Place &GetVarPlace(const framework::Variable &src) {
if (src.IsType<framework::LoDTensor>()) {
return src.Get<framework::LoDTensor>().place();
#if NCCL_VERSION_CODE >= 2212
} else if (src.IsType<framework::SelectedRows>()) {
return src.Get<framework::SelectedRows>().value().place();
#endif
} else {
PADDLE_THROW(platform::errors::InvalidArgument(
"Cannot get unsupported variable type %s for imperative allreduce, "
"only "
"LoDTensor and SelectedRows are supported.",
platform::demangle(framework::ToTypeName(src.Type()))));
}
}
void AllReduce(const framework::Variable &src, framework::Variable *dst,
const ParallelStrategy &strategy) {
const auto &place = GetVarPlace(src);
PADDLE_ENFORCE_EQ(
platform::is_gpu_place(place), true,
platform::errors::Unimplemented(
"Imperative mode does not support multi-CPU training yet."));
auto *dev_ctx = static_cast<platform::CUDADeviceContext *>(
platform::DeviceContextPool::Instance().Get(place));
auto stream = dev_ctx->stream();
AllReduce(src, dst, strategy, stream);
AllReduce(src, dst, strategy, 0, true);
}
} // namespace imperative
......
......@@ -19,11 +19,17 @@
#include <cuda.h>
#include <cuda_runtime.h>
#include <nccl.h>
#include <string>
#include <utility>
#include "paddle/fluid/framework/lod_tensor.h"
#include "paddle/fluid/framework/scope.h"
#include "paddle/fluid/framework/selected_rows.h"
#include "paddle/fluid/framework/variable.h"
#include "paddle/fluid/imperative/nccl_context.h"
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/nccl_helper.h"
#include "paddle/fluid/string/string_helper.h"
namespace paddle {
namespace framework {
......@@ -40,7 +46,8 @@ void AllReduce(const framework::Variable &src, framework::Variable *dst,
const ParallelStrategy &strategy);
void AllReduce(const framework::Variable &src, framework::Variable *dst,
const ParallelStrategy &strategy, cudaStream_t stream);
const ParallelStrategy &strategy, int ring_id,
bool use_calc_stream);
} // namespace imperative
} // namespace paddle
......
......@@ -17,8 +17,10 @@
namespace paddle {
namespace imperative {
#if defined(PADDLE_WITH_NCCL)
void NCCLParallelContext::RecvNCCLID(const std::string &ep,
ncclUniqueId *nccl_id) {
void NCCLParallelContext::RecvNCCLID(
const std::string &ep,
std::vector<ncclUniqueId> &nccl_ids) { // NOLINT
int nrings = nccl_ids.size();
auto addr = paddle::string::Split(ep, ':');
PADDLE_ENFORCE_EQ(
addr.size(), 2UL,
......@@ -85,14 +87,16 @@ void NCCLParallelContext::RecvNCCLID(const std::string &ep,
}
VLOG(3) << "recevived the ncclUniqueId";
memcpy(nccl_id, buffer, NCCL_UNIQUE_ID_BYTES);
memcpy(&nccl_ids[0], buffer, nrings * NCCL_UNIQUE_ID_BYTES);
VLOG(3) << "closing the socket server: " << ep;
close(server_fd);
}
void NCCLParallelContext::SendNCCLID(const std::string &ep,
ncclUniqueId *nccl_id) {
void NCCLParallelContext::SendNCCLID(
const std::string &ep, const std::vector<ncclUniqueId> &nccl_ids) {
int nrings = nccl_ids.size();
auto addr = paddle::string::Split(ep, ':');
PADDLE_ENFORCE_EQ(
addr.size(), 2UL,
......@@ -100,12 +104,12 @@ void NCCLParallelContext::SendNCCLID(const std::string &ep,
"The endpoint should contain host and port, but got %s.", ep));
std::string host = addr[0];
int port = std::stoi(addr[1]);
// struct sockaddr_in address;
int sock = 0;
struct sockaddr_in serv_addr;
char buffer[1024] = {0};
memcpy(buffer, nccl_id, NCCL_UNIQUE_ID_BYTES);
memcpy(buffer, &nccl_ids[0], nrings * NCCL_UNIQUE_ID_BYTES);
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
PADDLE_THROW(platform::errors::Unavailable("Create socket failed."));
}
......@@ -149,40 +153,46 @@ void NCCLParallelContext::SendNCCLID(const std::string &ep,
continue;
}
VLOG(3) << "sending the ncclUniqueId to " << ep;
send(sock, buffer, NCCL_UNIQUE_ID_BYTES, 0);
send(sock, buffer, NCCL_UNIQUE_ID_BYTES * nrings, 0);
break;
}
close(sock);
}
void NCCLParallelContext::BcastNCCLId(ncclUniqueId *nccl_id, int root) {
void NCCLParallelContext::BcastNCCLId(
std::vector<ncclUniqueId> &nccl_ids, // NOLINT
int root) {
if (strategy_.local_rank_ == root) {
for (auto ep : strategy_.trainer_endpoints_) {
if (ep != strategy_.current_endpoint_) SendNCCLID(ep, nccl_id);
if (ep != strategy_.current_endpoint_) SendNCCLID(ep, nccl_ids);
}
} else {
RecvNCCLID(strategy_.current_endpoint_, nccl_id);
RecvNCCLID(strategy_.current_endpoint_, nccl_ids);
}
}
void NCCLParallelContext::Init() {
for (int ring_id = 0; ring_id < strategy_.nrings_; ring_id++) {
ncclUniqueId nccl_id;
if (strategy_.local_rank_ == 0) {
// generate the unique ncclid on the root worker
platform::dynload::ncclGetUniqueId(&nccl_id);
BcastNCCLId(&nccl_id, 0);
} else {
BcastNCCLId(&nccl_id, 0);
std::vector<ncclUniqueId> nccl_ids;
nccl_ids.resize(strategy_.nrings_);
if (strategy_.local_rank_ == 0) {
// generate the unique ncclid on the root worker
for (size_t i = 0; i < nccl_ids.size(); ++i) {
platform::dynload::ncclGetUniqueId(&nccl_ids[i]);
}
int gpu_id = BOOST_GET_CONST(platform::CUDAPlace, place_).device;
BcastNCCLId(nccl_ids, 0);
} else {
BcastNCCLId(nccl_ids, 0);
}
int gpu_id = BOOST_GET_CONST(platform::CUDAPlace, place_).device;
for (int ring_id = 0; ring_id < strategy_.nrings_; ring_id++) {
VLOG(0) << "init nccl context nranks: " << strategy_.nranks_
<< " local rank: " << strategy_.local_rank_ << " gpu id: " << gpu_id
<< " ring id: " << ring_id;
// it will assign nccl_comm in CUDADeviceContext within ring_id
platform::NCCLCommContext::Instance().CreateNCCLComm(
&nccl_id, strategy_.nranks_, strategy_.local_rank_, gpu_id, ring_id);
&nccl_ids[ring_id], strategy_.nranks_, strategy_.local_rank_, gpu_id,
ring_id);
}
}
......@@ -193,15 +203,7 @@ void NCCLParallelContext::AllReduceByStream(const framework::Variable &src,
platform::is_gpu_place(place_), true,
platform::errors::Unimplemented(
"Dynamic graph mode does not support multi-CPU training yet."));
auto comm = platform::NCCLCommContext::Instance().Get(ring_id, place_);
cudaStream_t stream = nullptr;
if (use_calc_stream) {
auto dev_ctx = platform::DeviceContextPool::Instance().Get(place_);
stream = static_cast<platform::CUDADeviceContext *>(dev_ctx)->stream();
} else {
stream = comm->stream();
}
AllReduce(src, dst, strategy_, stream);
AllReduce(src, dst, strategy_, ring_id, use_calc_stream);
}
paddle::platform::CUDADeviceContext *NCCLParallelContext::GetDeviceContext(
......
......@@ -73,6 +73,8 @@ class ParallelContext {
int ring_id) = 0;
#endif
inline int GetNRings() { return strategy_.nrings_; }
protected:
ParallelStrategy strategy_;
platform::Place place_;
......@@ -87,7 +89,7 @@ class NCCLParallelContext : public ParallelContext {
~NCCLParallelContext() {}
void BcastNCCLId(ncclUniqueId* nccl_id, int root);
void BcastNCCLId(std::vector<ncclUniqueId>& nccl_ids, int root); // NOLINT
void Init() override;
......@@ -98,9 +100,11 @@ class NCCLParallelContext : public ParallelContext {
paddle::platform::CUDADeviceContext* GetDeviceContext(int ring_id) override;
protected:
void RecvNCCLID(const std::string& endpoint, ncclUniqueId* nccl_id);
void RecvNCCLID(const std::string& endpoint,
std::vector<ncclUniqueId>& nccl_ids); // NOLINT
void SendNCCLID(const std::string& endpoint, ncclUniqueId* nccl_id);
void SendNCCLID(const std::string& endpoint,
const std::vector<ncclUniqueId>& nccl_ids);
};
#endif
......
......@@ -68,7 +68,7 @@ void Group::SplitTensors(const platform::CUDADeviceContext &context) {
std::ostream &operator<<(std::ostream &out, const Group &group) {
const auto &vars = group.variable_indices_;
out << "numul: " << group.all_length_ << " ;is_sparse: " << group.is_sparse_
out << "numel: " << group.all_length_ << " ;is_sparse: " << group.is_sparse_
<< " ;var number: " << vars.size() << "\n";
auto begin = vars.begin();
auto end = vars.end();
......@@ -95,6 +95,7 @@ Reducer::Reducer(const std::vector<std::shared_ptr<imperative::VarBase>> &vars,
parallel_ctx_(parallel_ctx),
group_size_limits_(group_size_limits) {
VLOG(3) << "Start construct the Reducer ...";
nrings_ = parallel_ctx->GetNRings();
// initialize groups
InitializeGroups(group_indices);
for (size_t global_var_index = 0; global_var_index < vars_.size();
......@@ -109,11 +110,13 @@ Reducer::Reducer(const std::vector<std::shared_ptr<imperative::VarBase>> &vars,
compute_stream_ = static_cast<platform::CUDADeviceContext *>(
platform::DeviceContextPool::Instance().Get(place_))
->stream();
comm_stream_ = platform::NCCLCommContext::Instance().Get(0, place_)->stream();
// create events
for (int i = 0; i < nrings_; ++i) {
comm_streams_.emplace_back(
platform::NCCLCommContext::Instance().Get(i, place_)->stream());
comm_events_.emplace_back(platform::CudaEventResourcePool::Instance().New(
BOOST_GET_CONST(platform::CUDAPlace, place_).device));
}
CreateGroupEvents(group_indices.size());
comm_enent_ = platform::CudaEventResourcePool::Instance().New(
BOOST_GET_CONST(platform::CUDAPlace, place_).device);
std::call_once(once_flag_, []() {
std::atexit([]() { Reducer::GetInstance()->ReleaseReducer(); });
......@@ -121,20 +124,22 @@ Reducer::Reducer(const std::vector<std::shared_ptr<imperative::VarBase>> &vars,
}
void Reducer::ReleaseReducer() {
for (auto &event : events_) {
for (auto &event : group_events_) {
event.reset();
}
for (auto &event : comm_events_) {
event.reset();
}
comm_enent_.reset();
}
void Reducer::CreateGroupEvents(int group_num) {
// release old events
for (auto &event : events_) {
for (auto &event : group_events_) {
event.reset();
}
events_.clear();
events_.resize(group_num);
for (auto &event : events_) {
group_events_.clear();
group_events_.resize(group_num);
for (auto &event : group_events_) {
event = platform::CudaEventResourcePool::Instance().New(
BOOST_GET_CONST(platform::CUDAPlace, place_).device);
}
......@@ -194,7 +199,7 @@ void Reducer::InitializeDenseGroups(
// Each parameter will be initialized according to the group information.
// For the sparse parameter, sparse_contents_ in the group directly points
// to the parameter. For dense parameters, first construct an empty Tensor().
// Then specify the actual memory in MarkVariableReady.
// Then specify the actual memory in MarkDenseVarReady.
void Reducer::InitializeGroups(
const std::vector<std::vector<size_t>> &group_indices) {
VLOG(3) << "Start initialize groups ..";
......@@ -218,7 +223,6 @@ void Reducer::InitializeGroups(
if (variable_indices_.size() == 1 &&
is_sparse_gradient_[variable_indices_.front()]) {
// process the sparse gradient. one sparse, one group
group.sparse_contents_ = first_varbase->MutableGradVar();
group.dtype_ = first_varbase->DataType();
group.is_sparse_ = true;
} else {
......@@ -232,7 +236,7 @@ void Reducer::InitializeGroups(
// map variables to this group by VariableLocator
size_t inside_group_index = 0;
for (const auto var_index : group_indices[group_index]) {
for (const auto var_index : variable_indices_) {
variable_locators_[var_index] = VariableLocator{
.group_index = group_index,
.inside_group_index = inside_group_index++,
......@@ -260,7 +264,7 @@ void Reducer::PrepareForBackward() {
// Add hook function to each leaf node. When the gradient of a leaf node is
// generated, if it is the sparse parameter, it will directly execute allreduce,
// if it is the dense parameter, it will execute three steps: 1,
// MarkVariableReady. Find the position of the corresponding group
// MarkDenseVarReady. Find the position of the corresponding group
// through var_index, share the gradient memory and the group dense_tensors,
// the group counter is reduced by 1. 2, MarkGroupReady: When the group
// counter is 0, it means that allreduce can be emitted, and
......@@ -278,8 +282,11 @@ void Reducer::AddDistHook(VariableWrapper *var_warpper, size_t var_index) {
if (!group.is_sparse_) {
// Only dense_contents_ need memory copy
MarkVariableReady(var_index, var_warpper);
MarkDenseVarReady(var_index, var_warpper);
} else {
MarkSparseVarReady(var_index, var_warpper);
}
if (--group.pending_ == 0) {
// can start allreduce
MarkGroupReady(group_index);
......@@ -290,7 +297,7 @@ void Reducer::AddDistHook(VariableWrapper *var_warpper, size_t var_index) {
}
}
void Reducer::MarkVariableReady(size_t var_index,
void Reducer::MarkDenseVarReady(size_t var_index,
VariableWrapper *var_warpper) {
const auto &var_locator = variable_locators_[var_index];
auto group_index = var_locator.group_index;
......@@ -303,6 +310,14 @@ void Reducer::MarkVariableReady(size_t var_index,
{static_cast<int64_t>(length)});
}
void Reducer::MarkSparseVarReady(size_t var_index,
VariableWrapper *var_warpper) {
const auto &var_locator = variable_locators_[var_index];
auto group_index = var_locator.group_index;
auto &group = groups_[group_index];
group.sparse_contents_ = var_warpper->MutableVar();
}
void Reducer::MarkGroupReady(size_t group_index) {
if (group_index > next_group_) {
VLOG(3) << "It will adjust the order of group in next batch automatically";
......@@ -310,29 +325,35 @@ void Reducer::MarkGroupReady(size_t group_index) {
}
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaEventRecord(events_[group_index].get(), compute_stream_));
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaStreamWaitEvent(comm_stream_, events_[group_index].get(), 0));
cudaEventRecord(group_events_[group_index].get(), compute_stream_));
for (int i = 0; i < nrings_; ++i) {
PADDLE_ENFORCE_CUDA_SUCCESS(cudaStreamWaitEvent(
comm_streams_[i], group_events_[group_index].get(), 0));
}
for (; next_group_ < groups_.size() && groups_[next_group_].pending_ == 0;
++next_group_) {
auto &group = groups_[next_group_];
int run_order = next_group_ % nrings_;
if (group.is_sparse_) {
VLOG(3) << "sparse group [" << next_group_ << "] start allreduce...";
parallel_ctx_->AllReduceByStream(*group.sparse_contents_,
group.sparse_contents_, 0, false);
VLOG(3) << "sparse group [" << next_group_ << "] start allreduce in ring["
<< run_order << "]";
parallel_ctx_->AllReduceByStream(
*group.sparse_contents_, group.sparse_contents_, run_order, false);
} else {
VLOG(3) << "dense group [" << next_group_ << "] start allreduce...";
VLOG(3) << "dense group [" << next_group_ << "] start allreduce in ring["
<< run_order << "]";
// Select common commstream to concat tensors
// group.dense_tensors ---> group.dense_contents_
group.ConcatTensors(*parallel_ctx_->GetDeviceContext(0));
group.ConcatTensors(*parallel_ctx_->GetDeviceContext(run_order));
// Start allreduce
parallel_ctx_->AllReduceByStream(group.dense_contents_,
&(group.dense_contents_), 0, false);
parallel_ctx_->AllReduceByStream(
group.dense_contents_, &(group.dense_contents_), run_order, false);
// Select common commstream to split tensors
// group.dense_contents_ ---> group.dense_tensors
group.SplitTensors(*parallel_ctx_->GetDeviceContext(0));
group.SplitTensors(*parallel_ctx_->GetDeviceContext(run_order));
}
}
}
......@@ -351,9 +372,16 @@ std::vector<std::vector<size_t>> Reducer::RebuildGruops() {
}
void Reducer::FinalizeBackward() {
PADDLE_ENFORCE_CUDA_SUCCESS(cudaEventRecord(comm_enent_.get(), comm_stream_));
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaStreamWaitEvent(compute_stream_, comm_enent_.get(), 0));
// Must prevent compute_stream_ starting until all comm streams have finished
for (int i = 0; i < nrings_; ++i) {
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaEventRecord(comm_events_[i].get(), comm_streams_[i]));
}
for (int i = 0; i < nrings_; ++i) {
PADDLE_ENFORCE_CUDA_SUCCESS(
cudaStreamWaitEvent(compute_stream_, comm_events_[i].get(), 0));
}
if (!has_rebuilt_group_) {
VLOG(3) << "Start rebuilding the groups";
auto rebuild_group_indices = RebuildGruops();
......@@ -362,6 +390,7 @@ void Reducer::FinalizeBackward() {
CreateGroupEvents(rebuild_group_number);
InitializeGroups(group_indices_);
}
VLOG(3) << "In the batch, Reducer is finished...";
}
......
......@@ -16,6 +16,7 @@
#include <algorithm>
#include <iostream>
#include <map>
#include <memory>
#include <string>
#include <unordered_map>
......@@ -133,7 +134,9 @@ class Reducer {
void AddDistHook(VariableWrapper* var_warpper, size_t var_index);
void MarkVariableReady(size_t var_index, VariableWrapper* var_warpper);
void MarkDenseVarReady(size_t var_index, VariableWrapper* var_warpper);
void MarkSparseVarReady(size_t var_index, VariableWrapper* var_warpper);
void MarkGroupReady(size_t group_index);
......@@ -180,10 +183,11 @@ class Reducer {
std::vector<VariableLocator> variable_locators_;
// Following variables are to help sync stream
std::vector<std::shared_ptr<platform::CudaEventObject>> events_;
std::shared_ptr<platform::CudaEventObject> comm_enent_;
std::vector<std::shared_ptr<platform::CudaEventObject>> group_events_;
std::vector<std::shared_ptr<platform::CudaEventObject>> comm_events_;
cudaStream_t compute_stream_;
cudaStream_t comm_stream_;
std::vector<cudaStream_t> comm_streams_;
int nrings_ = 1;
// Following variables are to help rebuild group
bool has_rebuilt_group_{false};
......
......@@ -19,6 +19,7 @@
namespace imperative = paddle::imperative;
namespace platform = paddle::platform;
int nrings = 2;
imperative::ParallelStrategy GetStrategy(int local_rank) {
std::vector<std::string> eps = {"127.0.0.1:9866", "localhost:9867"};
imperative::ParallelStrategy strategy;
......@@ -26,27 +27,38 @@ imperative::ParallelStrategy GetStrategy(int local_rank) {
strategy.current_endpoint_ = eps[local_rank];
strategy.nranks_ = 2;
strategy.local_rank_ = local_rank;
strategy.nrings_ = nrings;
return strategy;
}
#if defined(PADDLE_WITH_NCCL)
void BcastNCCLId(int local_rank, ncclUniqueId *nccl_id) {
void BcastNCCLId(int local_rank, std::vector<ncclUniqueId>* nccl_ids) {
auto strategy = GetStrategy(local_rank);
platform::CUDAPlace gpu(local_rank);
imperative::NCCLParallelContext ctx(strategy, gpu);
ctx.BcastNCCLId(nccl_id, 0);
ctx.BcastNCCLId(*nccl_ids, 0);
}
TEST(BcastNCCLId, Run) {
ncclUniqueId nccl_id;
platform::dynload::ncclGetUniqueId(&nccl_id);
std::thread t(BcastNCCLId, 0, &nccl_id);
std::vector<ncclUniqueId> nccl_ids;
nccl_ids.resize(nrings);
for (int i = 0; i < nrings; ++i) {
platform::dynload::ncclGetUniqueId(&nccl_ids[i]);
}
ncclUniqueId recv_nccl_id;
BcastNCCLId(1, &recv_nccl_id);
std::thread t(BcastNCCLId, 0, &nccl_ids);
std::vector<ncclUniqueId> recv_nccl_ids;
recv_nccl_ids.resize(nrings);
for (int i = 0; i < nrings; ++i) {
platform::dynload::ncclGetUniqueId(&recv_nccl_ids[i]);
}
BcastNCCLId(1, &recv_nccl_ids);
t.join();
EXPECT_EQ(0, std::memcmp(nccl_id.internal, recv_nccl_id.internal,
NCCL_UNIQUE_ID_BYTES));
for (int i = 0; i < nrings; ++i) {
EXPECT_EQ(0, std::memcmp(nccl_ids[i].internal, recv_nccl_ids[i].internal,
NCCL_UNIQUE_ID_BYTES));
}
}
#endif
......@@ -33,7 +33,7 @@ TEST(TestGroup, TestPrintGroupMessage) {
std::stringstream stream1, stream2;
stream1 << group;
ASSERT_STREQ(stream1.str().c_str(),
"numul: 0 ;is_sparse: 0 ;var number: 0\n[]\n");
"numel: 0 ;is_sparse: 0 ;var number: 0\n[]\n");
std::vector<size_t> vars;
size_t vars_num = 102;
......@@ -44,7 +44,7 @@ TEST(TestGroup, TestPrintGroupMessage) {
group.all_length_ = 102;
group.is_sparse_ = false;
std::string head = "numul: 102 ;is_sparse: 0 ;var number: 102\n";
std::string head = "numel: 102 ;is_sparse: 0 ;var number: 102\n";
head = head + "[";
auto begin = vars.begin();
auto end = vars.end();
......
......@@ -1261,7 +1261,13 @@ void BindImperative(py::module *m_ptr) {
return self.current_endpoint_;
},
[](imperative::ParallelStrategy &self,
const std::string &ep) { self.current_endpoint_ = ep; });
const std::string &ep) { self.current_endpoint_ = ep; })
.def_property(
"nrings",
[](const imperative::ParallelStrategy &self) { return self.nrings_; },
[](imperative::ParallelStrategy &self, int nrings) {
self.nrings_ = nrings;
});
m.def(
"dygraph_partial_grad",
......
......@@ -16,6 +16,7 @@ from __future__ import print_function
import copy
import warnings
import paddle
import os
from paddle.fluid.framework import dygraph_only
from paddle.fluid import compiler
from .role_maker import UserDefinedRoleMaker, PaddleCloudRoleMaker, RoleMakerBase
......@@ -221,6 +222,15 @@ class Fleet(object):
warnings.warn(
"The dygraph parallel environment has been initialized.")
else:
# FLAGS_nccl_nrings is used for dynamic graph multi-stream communication
if "FLAGS_nccl_nrings" in os.environ:
warnings.warn(
"You have set the environment variable FLAGS_nccl_nrings "
"outside the program, so the nccl_comm_num in "
"DistributedStrategy will not take effect here.")
else:
os.environ["FLAGS_nccl_nrings"] = str(
self._user_defined_strategy.nccl_comm_num)
paddle.distributed.init_parallel_env()
def is_first_worker(self):
......
......@@ -166,6 +166,7 @@ def init_parallel_env():
strategy.local_rank = parallel_env.rank
strategy.trainer_endpoints = parallel_env.trainer_endpoints
strategy.current_endpoint = parallel_env.current_endpoint
strategy.nrings = parallel_env.nrings
# NOTE(chenweihang): [ why config global place here? ]
# the dygraph mode will be set to default mode,
......
......@@ -114,6 +114,11 @@ class ParallelEnv(object):
self._trainer_endpoints = os.getenv("PADDLE_TRAINER_ENDPOINTS",
"").split(",")
self._current_endpoint = os.getenv("PADDLE_CURRENT_ENDPOINT", "")
self._nrings = int(os.getenv("FLAGS_nccl_nrings", "1"))
assert self._nrings > 0, \
"nccl_nrings must be an integer greater than 0."
assert self._nrings < 9, \
"nccl_nrings should be less than 9, which is enough in most scenarios."
@property
def rank(self):
......@@ -211,6 +216,25 @@ class ParallelEnv(object):
"""
return self._trainer_endpoints
@property
def nrings(self):
"""
Nrings of current trainer.
Its value is equal to the value of the environment variable ``FLAGS_nccl_nrings`` . The default value is 1.
Examples:
.. code-block:: python
# execute this command in terminal: export FLAGS_nccl_nrings=1
import paddle.distributed as dist
env = dist.ParallelEnv()
print("The nrings is %d" % env.nrings)
# the number of ring is 1
"""
return self._nrings
# [aliases] Compatible with old method names
local_rank = rank
nranks = world_size
......@@ -397,8 +421,8 @@ class DataParallel(layers.Layer):
else:
warnings.warn("The program will return to single-card operation. "
"Please check 1, whether you use spawn or fleetrun "
"to start the program. 2. Whether it is a multi-card "
"program. 3. Is the current environment multi-card.")
"to start the program. 2, Whether it is a multi-card "
"program. 3, Is the current environment multi-card.")
def init_reducer(self):
layers_param = []
......@@ -424,7 +448,7 @@ class DataParallel(layers.Layer):
if isinstance(sublayer, paddle.nn.layer.common.Embedding):
return sublayer._sparse
# NOTE(shenliang03):This is for compatibility. If paddle.fluid.dygraph.Embedding
# is removed in the future, the judgment will also be removed here.
# is removed in the future, the check will also be removed here.
if isinstance(sublayer, paddle.fluid.dygraph.Embedding):
return sublayer._is_sparse
return False
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册