未验证 提交 f38e09f0 编写于 作者: W Wen Sun 提交者: GitHub

refactor: rm redundant funcs (#48149)

上级 c775bc69
......@@ -244,26 +244,12 @@ class ProcessGroup {
"ProcessGroup%s does not support send", GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> Send(
std::vector<phi::DenseTensor>&, int, bool) { // NOLINT
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support send with sync_op flag",
GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> Recv(
std::vector<phi::DenseTensor>&, int) { // NOLINT
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support recv", GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> Recv(
std::vector<phi::DenseTensor>&, int, bool) { // NOLINT
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support recv with sync_op flag",
GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> AllGather(
std::vector<phi::DenseTensor>&, // NOLINT
std::vector<phi::DenseTensor>&) { // NOLINT
......@@ -287,14 +273,6 @@ class ProcessGroup {
"ProcessGroup%s does not support AllToAll", GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> AllToAll(
std::vector<phi::DenseTensor>&, // NOLINT
std::vector<phi::DenseTensor>&, // NOLINT
bool) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support alltoall", GetBackendName()));
}
virtual std::shared_ptr<ProcessGroup::Task> Reduce(
std::vector<phi::DenseTensor>&, // NOLINT
std::vector<phi::DenseTensor>&, // NOLINT
......
......@@ -626,35 +626,6 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::PointToPoint(
return task;
}
void ProcessGroupNCCL::CheckSplitSizes(std::vector<int64_t>* split_sizes,
std::vector<int64_t> tensor_shape) {
int64_t len_size = (*split_sizes).size();
if (len_size == 0) {
PADDLE_ENFORCE_EQ(tensor_shape[0] % size_ == 0,
true,
platform::errors::InvalidArgument(
"Tensor's dim[0] must be divisible by group size "
"when split_sizes not given."));
(*split_sizes)
.insert((*split_sizes).end(),
size_,
static_cast<int64_t>(tensor_shape[0] / size_));
} else {
PADDLE_ENFORCE_EQ(
len_size == size_,
true,
platform::errors::InvalidArgument(
"The length of split_sizes must be equal to group size."));
auto sum_size = std::accumulate(
(*split_sizes).begin(), (*split_sizes).end(), static_cast<int64_t>(0));
PADDLE_ENFORCE_EQ(
sum_size == tensor_shape[0],
true,
platform::errors::InvalidArgument(
"The sum of split_sizes must be equal to tensor's dim[0]."));
}
}
// TODO(sunyilun): methods below will be removed later
void SyncDefaultStream(const std::vector<Place>& places,
platform::DeviceEvent& nccl_event, // NOLINT
......@@ -676,17 +647,6 @@ std::shared_ptr<ProcessGroupNCCL::NCCLTask> ProcessGroupNCCL::CreateTask(
places, rank, comm_type, inputs);
}
std::shared_ptr<ProcessGroupNCCL::NCCLTask> ProcessGroupNCCL::CreateTask(
const std::vector<Place>& places,
int rank,
CommType comm_type,
const std::vector<phi::DenseTensor>& inputs,
bool is_sync,
bool use_calc_stream) {
return std::make_shared<ProcessGroupNCCL::NCCLTask>(
places, rank, comm_type, inputs, is_sync, use_calc_stream);
}
ProcessGroupNCCL::NCCLTask::NCCLTask(
const std::vector<Place>& places,
int rank,
......@@ -696,17 +656,6 @@ ProcessGroupNCCL::NCCLTask::NCCLTask(
comm_event_(places[0]),
task_place_(places[0]) {}
ProcessGroupNCCL::NCCLTask::NCCLTask(
const std::vector<Place>& places,
int rank,
CommType comm_type,
const std::vector<phi::DenseTensor>& inputs,
bool sync_op,
bool use_calc_stream)
: TaskStream(rank, inputs, comm_type, sync_op, use_calc_stream),
comm_event_(places[0]),
task_place_(places[0]) {}
// create NCCLManager cache for places_key
void ProcessGroupNCCL::CreateNCCLManagerCache(
const std::string& places_key, const std::vector<Place>& places) {
......@@ -759,85 +708,6 @@ void ProcessGroupNCCL::CreateNCCLManagerCache(
places_to_ctx_.emplace(places_key, std::move(dev_ctx_raw));
}
template <typename Fn>
std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Collective(
std::vector<phi::DenseTensor>& inputs,
std::vector<phi::DenseTensor>& outputs,
Fn fn,
CommType comm_type,
bool sync_op,
bool use_calc_stream) {
const auto& places = GetPlaceList(inputs);
const auto& key = GetKeyFromPlaces(places);
{
std::lock_guard<std::mutex> lock(mutex_);
if (place_to_comm_ctx_.find(key) == place_to_comm_ctx_.end()) {
CreateNCCLManagerCache(key, places);
}
}
if (!use_calc_stream) {
SyncDefaultStream(
places, place_to_calc_event_.at(key), places_to_ctx_.at(key));
}
auto task =
CreateTask(places, rank_, comm_type, inputs, sync_op, use_calc_stream);
platform::CUDADeviceGuard cuda_guard;
{
platform::NCCLGroupGuard nccl_guard;
for (size_t i = 0; i < inputs.size(); ++i) {
cuda_guard.SetDevice(places[i]);
gpuStream_t nccl_stream;
if (use_calc_stream) {
nccl_stream =
static_cast<phi::GPUContext*>(
platform::DeviceContextPool::Instance().Get(places[i]))
->stream();
} else {
nccl_stream = places_to_ctx_.at(key)[i]->stream();
}
fn(inputs[i],
outputs[i],
places_to_ctx_.at(key)[i]->nccl_comm(),
nccl_stream);
}
}
if (FLAGS_use_stream_safe_cuda_allocator) {
for (size_t i = 0; i < inputs.size(); ++i) {
cuda_guard.SetDevice(places[i]);
gpuStream_t nccl_stream;
if (use_calc_stream) {
nccl_stream =
static_cast<phi::GPUContext*>(
platform::DeviceContextPool::Instance().Get(places[i]))
->stream();
} else {
nccl_stream = places_to_ctx_.at(key)[i]->stream();
}
memory::RecordStream(inputs[i].Holder(), nccl_stream);
}
}
// Adding stream event dependency only when use comm stream
if (!use_calc_stream) {
for (size_t i = 0; i < inputs.size(); ++i) {
cuda_guard.SetDevice(places[i]);
task->UpdateWaitChain(*places_to_ctx_.at(key)[i]);
}
}
return task;
}
template <typename Fn>
std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Collective(
std::vector<phi::DenseTensor>& inputs,
......@@ -889,117 +759,6 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Collective(
return task;
}
template <typename Fn>
void ProcessGroupNCCL::Collective(const phi::DenseTensor* in,
phi::DenseTensor* out,
Fn fn,
CommType op_type) {
std::vector<Place> places;
places.push_back(in->place());
const std::string& key = GetKeyFromPlaces(places);
{
std::lock_guard<std::mutex> lock(mutex_);
if (place_to_comm_ctx_.find(key) == place_to_comm_ctx_.end()) {
CreateNCCLManagerCache(key, places);
}
}
SyncDefaultStream(
places, place_to_calc_event_.at(key), places_to_ctx_.at(key));
// construct uninitialize guard for device
platform::CUDADeviceGuard cuda_guard;
if (FLAGS_use_stream_safe_cuda_allocator) {
cuda_guard.SetDevice(places[0]);
memory::RecordStream(in->Holder(), places_to_ctx_.at(key)[0]->stream());
}
{
platform::NCCLGroupGuard nccl_guard;
cuda_guard.SetDevice(places[0]);
const auto& nccl_stream = places_to_ctx_.at(key)[0]->stream();
fn(in, out, places_to_ctx_.at(key)[0]->nccl_comm(), nccl_stream);
}
cuda_guard.SetDevice(places[0]);
}
template <typename Fn>
std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::PointToPoint(
std::vector<phi::DenseTensor>& tensors,
Fn fn,
int dst_rank,
CommType op_type,
bool sync_op,
bool use_calc_stream) {
const auto& places = GetPlaceList(tensors);
const auto& key = GetKeyFromPlaces(places);
{
std::lock_guard<std::mutex> lock(mutex_);
if (place_to_comm_ctx_.find(key) == place_to_comm_ctx_.end()) {
CreateNCCLManagerCache(key, places);
}
}
if (!use_calc_stream) {
SyncDefaultStream(
places, place_to_calc_event_.at(key), places_to_ctx_.at(key));
}
auto task =
CreateTask(places, rank_, op_type, tensors, sync_op, use_calc_stream);
platform::CUDADeviceGuard cuda_guard;
{
platform::NCCLGroupGuard nccl_guard;
for (size_t i = 0; i < tensors.size(); ++i) {
cuda_guard.SetDevice(places[i]);
gpuStream_t nccl_stream;
if (use_calc_stream) {
nccl_stream =
static_cast<phi::GPUContext*>(
platform::DeviceContextPool::Instance().Get(places[i]))
->stream();
} else {
nccl_stream = places_to_ctx_.at(key)[i]->stream();
}
fn(tensors[i],
places_to_ctx_.at(key)[i]->nccl_comm(),
nccl_stream,
dst_rank);
}
}
if (FLAGS_use_stream_safe_cuda_allocator) {
for (size_t i = 0; i < tensors.size(); ++i) {
cuda_guard.SetDevice(places[i]);
gpuStream_t nccl_stream;
if (use_calc_stream) {
nccl_stream =
static_cast<phi::GPUContext*>(
platform::DeviceContextPool::Instance().Get(places[i]))
->stream();
} else {
nccl_stream = places_to_ctx_.at(key)[i]->stream();
}
memory::RecordStream(tensors[i].Holder(), nccl_stream);
}
}
if (!use_calc_stream) {
for (size_t i = 0; i < tensors.size(); ++i) {
cuda_guard.SetDevice(places[i]);
task->UpdateWaitChain(*places_to_ctx_.at(key)[i]);
}
}
return task;
}
template <typename Fn>
std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::PointToPoint(
std::vector<phi::DenseTensor>& tensors,
......@@ -1290,52 +1049,6 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::AllToAll(
CommType::ALLTOALL);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::AllToAll(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
bool sync_op,
bool use_calc_stream) {
PADDLE_ENFORCE_EQ(
CheckTensorsInCudaPlace(in_tensors),
true,
platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
PADDLE_ENFORCE_EQ(
CheckTensorsInCudaPlace(out_tensors),
true,
platform::errors::InvalidArgument("All inputs should be in CudaPlace."));
return Collective(
in_tensors,
out_tensors,
[&](phi::DenseTensor& input,
phi::DenseTensor& output,
ncclComm_t comm,
const gpuStream_t& stream) {
size_t offset = 0;
GroupStart();
for (auto i = 0; i < size_; i++) {
PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclSend(
GetPointerByOffset(input.data(), offset, input.dtype()),
input.numel() / size_,
platform::ToNCCLDataType(input.dtype()),
i,
comm,
stream));
PADDLE_ENFORCE_GPU_SUCCESS(platform::dynload::ncclRecv(
GetPointerByOffset(output.data(), offset, input.dtype()),
input.numel() / size_,
platform::ToNCCLDataType(input.dtype()),
i,
comm,
stream));
offset += input.numel() / size_;
}
GroupEnd();
},
CommType::ALLTOALL,
sync_op,
use_calc_stream);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupNCCL::Reduce(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
......
......@@ -68,12 +68,6 @@ class ProcessGroupNCCL final : public ProcessGroupStream {
int rank,
CommType CommType,
const std::vector<phi::DenseTensor>& inputs);
NCCLTask(const std::vector<Place>& places,
int rank,
CommType comm_type,
const std::vector<phi::DenseTensor>& inputs,
bool sync_op,
bool use_calc_stream);
private:
bool block_cpu_in_wait_{false};
......@@ -192,12 +186,6 @@ class ProcessGroupNCCL final : public ProcessGroupStream {
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors) override;
std::shared_ptr<ProcessGroup::Task> AllToAll(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
bool sync_op,
bool use_calc_stream) override;
std::shared_ptr<ProcessGroup::Task> Reduce(
std::vector<phi::DenseTensor>& tensors,
std::vector<phi::DenseTensor>& out_tensors,
......@@ -245,14 +233,6 @@ class ProcessGroupNCCL final : public ProcessGroupStream {
CommType op_type,
const std::vector<phi::DenseTensor>& inputs);
std::shared_ptr<ProcessGroupNCCL::NCCLTask> CreateTask(
const std::vector<Place>& places,
int rank,
CommType op_type,
const std::vector<phi::DenseTensor>& inputs,
bool sync_op,
bool use_calc_stream);
template <typename Fn>
std::shared_ptr<ProcessGroup::Task> Collective(
std::vector<phi::DenseTensor>& inputs, // NOLINT
......@@ -260,21 +240,6 @@ class ProcessGroupNCCL final : public ProcessGroupStream {
Fn fn,
CommType op_type);
template <typename Fn>
std::shared_ptr<ProcessGroupStream::Task> Collective(
std::vector<phi::DenseTensor>& inputs, // NOLINT
std::vector<phi::DenseTensor>& outputs, // NOLINT
Fn fn,
CommType comm_type,
bool sync_op,
bool use_calc_stream);
template <typename Fn>
void Collective(const phi::DenseTensor*,
phi::DenseTensor*,
Fn fn,
CommType op_type);
template <typename Fn>
std::shared_ptr<ProcessGroup::Task> PointToPoint(
std::vector<phi::DenseTensor>& tensors, // NOLINT
......@@ -282,21 +247,9 @@ class ProcessGroupNCCL final : public ProcessGroupStream {
int dst_rank,
CommType op_type);
template <typename Fn>
std::shared_ptr<ProcessGroup::Task> PointToPoint(
std::vector<phi::DenseTensor>& tensors, // NOLINT
Fn fn,
int dst_rank,
CommType op_type,
bool sync_op,
bool use_calc_stream);
void CreateNCCLManagerCache(const std::string& places_key,
const std::vector<Place>& places);
void CheckSplitSizes(std::vector<int64_t>* split_sizes,
std::vector<int64_t> tensor_shape);
private:
std::shared_ptr<Store> store_;
std::unordered_map<std::string, platform::DeviceEvent>
......
......@@ -236,42 +236,5 @@ std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::Send(
"ProcessGroup%s does not support send.", GetBackendName()));
}
// TODO(sunyilun): methods below will be removed later
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::AllToAll(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
bool sync_op) {
return AllToAll(in_tensors,
out_tensors,
sync_op,
/*use_calc_stream*/ false);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::AllToAll(
std::vector<phi::DenseTensor>& in_tensors,
std::vector<phi::DenseTensor>& out_tensors,
bool sync_op,
bool use_calc_stream) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support do alltoall", GetBackendName()));
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::Recv(
std::vector<phi::DenseTensor>& tensors, int src_rank, bool sync_op) {
return Recv(tensors,
src_rank,
sync_op,
/*use_calc_stream*/ false);
}
std::shared_ptr<ProcessGroup::Task> ProcessGroupStream::Recv(
std::vector<phi::DenseTensor>& tensors,
int src_rank,
bool sync_op,
bool use_calc_stream) {
PADDLE_THROW(platform::errors::InvalidArgument(
"ProcessGroup%s does not support do recv", GetBackendName()));
}
} // namespace distributed
} // namespace paddle
......@@ -179,29 +179,6 @@ class ProcessGroupStream : public ProcessGroup {
int64_t numel,
bool sync_op,
bool use_calc_stream);
// TODO(sunyilun): methods below will be removed later
std::shared_ptr<ProcessGroup::Task> AllToAll(
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors, // NOLINT
bool sync_op) override;
virtual std::shared_ptr<ProcessGroup::Task> AllToAll(
std::vector<phi::DenseTensor>& in_tensors, // NOLINT
std::vector<phi::DenseTensor>& out_tensors, // NOLINT
bool sync_op,
bool use_calc_stream);
std::shared_ptr<ProcessGroup::Task> Recv(
std::vector<phi::DenseTensor>& tensors, // NOLINT
int src_rank,
bool sync_op) override;
virtual std::shared_ptr<ProcessGroup::Task> Recv(
std::vector<phi::DenseTensor>& tensors, // NOLINT
int src_rank,
bool sync_op,
bool use_calc_stream);
};
} // namespace distributed
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册