From 4552be4842b6febb2acbc3b2f4e59e09884c80c8 Mon Sep 17 00:00:00 2001 From: Wen Sun <35923278+HermitSun@users.noreply.github.com> Date: Sat, 3 Dec 2022 10:41:11 +0800 Subject: [PATCH] Refactor collective communication static check (#48646) * refactor: classify static check * refactor: rename to static_check & use forward decl * refactor: switch to unary & binary funcs --- .../distributed/collective/CMakeLists.txt | 2 +- .../fluid/distributed/collective/NCCLTools.cc | 104 ------------ .../fluid/distributed/collective/NCCLTools.h | 27 --- .../collective/ProcessGroupNCCL.cc | 55 +++++-- .../distributed/collective/static_check.cc | 155 ++++++++++++++++++ .../distributed/collective/static_check.h | 77 +++++++++ 6 files changed, 273 insertions(+), 147 deletions(-) create mode 100644 paddle/fluid/distributed/collective/static_check.cc create mode 100644 paddle/fluid/distributed/collective/static_check.h diff --git a/paddle/fluid/distributed/collective/CMakeLists.txt b/paddle/fluid/distributed/collective/CMakeLists.txt index e5dc51c63f..83b42fd432 100644 --- a/paddle/fluid/distributed/collective/CMakeLists.txt +++ b/paddle/fluid/distributed/collective/CMakeLists.txt @@ -21,7 +21,7 @@ endif() if(WITH_NCCL OR WITH_RCCL) cc_library( processgroup_nccl - SRCS ProcessGroupNCCL.cc NCCLTools.cc Common.cc + SRCS ProcessGroupNCCL.cc NCCLTools.cc Common.cc static_check.cc DEPS processgroup processgroup_stream place diff --git a/paddle/fluid/distributed/collective/NCCLTools.cc b/paddle/fluid/distributed/collective/NCCLTools.cc index 988232b617..a8c437bb12 100644 --- a/paddle/fluid/distributed/collective/NCCLTools.cc +++ b/paddle/fluid/distributed/collective/NCCLTools.cc @@ -44,109 +44,5 @@ std::string SerializeNCCLUniqueId(const ncclUniqueId& ncclID) { return oss.str(); } -void StaticCheckTensor(const phi::DenseTensor& tensor, - int rank, - int world_size) { - // place check - PADDLE_ENFORCE_EQ( - platform::is_gpu_place(tensor.place()), - true, - platform::errors::InvalidArgument("Tensor should be in GPU place.")); - // rank check - PADDLE_ENFORCE_GE(rank, - 0, - platform::errors::InvalidArgument( - "Rank should be greater than or equal to 0.")); - PADDLE_ENFORCE_LT( - rank, - world_size, - platform::errors::InvalidArgument("Rank is out of the process group.")); -} - -// static check for collective -void StaticCheckTensors(const phi::DenseTensor& out_tensor, - const phi::DenseTensor& in_tensor, - int rank, - int world_size, - int out_size_factor, - int in_size_factor) { - // place check - PADDLE_ENFORCE_EQ(platform::is_gpu_place(out_tensor.place()), - true, - platform::errors::InvalidArgument( - "Output tensor should be in GPU place.")); - PADDLE_ENFORCE_EQ(platform::is_gpu_place(in_tensor.place()), - true, - platform::errors::InvalidArgument( - "Input tensor should be in GPU place.")); - // rank check - PADDLE_ENFORCE_GE(rank, - 0, - platform::errors::InvalidArgument( - "Rank should be greater than or equal to 0.")); - PADDLE_ENFORCE_LT( - rank, - world_size, - platform::errors::InvalidArgument("Rank is out of the process group.")); - // shape check - int64_t out_size = out_tensor.numel(); - PADDLE_ENFORCE_GT(out_size, - 0, - platform::errors::InvalidArgument( - "Size of output tensor should be greater than 0.")); - int64_t in_size = in_tensor.numel(); - PADDLE_ENFORCE_GT(in_size, - 0, - platform::errors::InvalidArgument( - "Size of input tensor should be greater than 0.")); - PADDLE_ENFORCE_EQ( - out_size * out_size_factor, - in_size * in_size_factor, - platform::errors::InvalidArgument( - "Input and output tensors should have matching sizes.")); - // dtype check - PADDLE_ENFORCE_EQ( - out_tensor.dtype(), - in_tensor.dtype(), - platform::errors::InvalidArgument( - "Input and output tensors should have the same data type.")); -} - -void StaticCheckTensorsSameShape(const phi::DenseTensor& out_tensor, - const phi::DenseTensor& in_tensor, - int rank, - int world_size) { - StaticCheckTensors(out_tensor, - in_tensor, - rank, - world_size, - /*out_size_factor*/ 1, - /*in_size_factor*/ 1); -} - -void StaticCheckTensorsScatterLikeShape(const phi::DenseTensor& out_tensor, - const phi::DenseTensor& in_tensor, - int rank, - int world_size) { - StaticCheckTensors(out_tensor, - in_tensor, - rank, - world_size, - /*out_size_factor*/ world_size, - /*in_size_factor*/ 1); -} - -void StaticCheckTensorsGatherLikeShape(const phi::DenseTensor& out_tensor, - const phi::DenseTensor& in_tensor, - int rank, - int world_size) { - StaticCheckTensors(out_tensor, - in_tensor, - rank, - world_size, - /*out_size_factor*/ 1, - /*in_size_factor*/ world_size); -} - } // namespace distributed } // namespace paddle diff --git a/paddle/fluid/distributed/collective/NCCLTools.h b/paddle/fluid/distributed/collective/NCCLTools.h index a882dae2e9..37b1e0f114 100644 --- a/paddle/fluid/distributed/collective/NCCLTools.h +++ b/paddle/fluid/distributed/collective/NCCLTools.h @@ -63,32 +63,5 @@ ncclRedOp_t ToNCCLRedType(ReduceOp reduction); std::string SerializeNCCLUniqueId(const ncclUniqueId& ncclID); -// static check for p2p -void StaticCheckTensor(const phi::DenseTensor& tensor, - int rank, - int world_size); - -// static check for collective -void StaticCheckTensors(const phi::DenseTensor& out_tensor, - const phi::DenseTensor& in_tensor, - int rank, - int world_size, - int out_size_factor, - int in_size_factor); - -void StaticCheckTensorsSameShape(const phi::DenseTensor& out_tensor, - const phi::DenseTensor& in_tensor, - int rank, - int world_size); - -void StaticCheckTensorsScatterLikeShape(const phi::DenseTensor& out_tensor, - const phi::DenseTensor& in_tensor, - int rank, - int world_size); - -void StaticCheckTensorsGatherLikeShape(const phi::DenseTensor& out_tensor, - const phi::DenseTensor& in_tensor, - int rank, - int world_size); } // namespace distributed } // namespace paddle diff --git a/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc b/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc index e995161cf3..b5c44962dd 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc +++ b/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc @@ -16,6 +16,7 @@ #include "paddle/fluid/distributed/collective/Common.h" #include "paddle/fluid/distributed/collective/NCCLTools.h" +#include "paddle/fluid/distributed/collective/static_check.h" #include "paddle/fluid/distributed/collective/utils.h" #include "paddle/fluid/platform/device/gpu/nccl_helper.h" #include "paddle/fluid/platform/place.h" @@ -138,8 +139,11 @@ std::shared_ptr ProcessGroupNCCL::AllGather( // numel > 0 indicates the tensor need to be sliced const phi::DenseTensor& in_tensor_maybe_partial = numel > 0 ? GetPartialTensor(in_tensor, offset, numel) : in_tensor; - StaticCheckTensorsGatherLikeShape( - *out_tensor, in_tensor_maybe_partial, rank_, size_); + CommStaticCheck::GatherLikeShape(*out_tensor, + in_tensor_maybe_partial, + /*dst_rank*/ rank_, + /*cur_rank*/ rank_, + size_); return RunFnInNCCLEnv( [&](ncclComm_t comm, gpuStream_t stream) { NCCL_CHECK(platform::dynload::ncclAllGather( @@ -162,7 +166,11 @@ std::shared_ptr ProcessGroupNCCL::AllReduce( const AllreduceOptions& opts, bool sync_op, bool use_calc_stream) { - StaticCheckTensorsSameShape(*out_tensor, in_tensor, rank_, size_); + CommStaticCheck::SameShape(*out_tensor, + in_tensor, + /*dst_rank*/ rank_, + /*cur_rank*/ rank_, + size_); return RunFnInNCCLEnv( [&](ncclComm_t comm, gpuStream_t stream) { NCCL_CHECK(platform::dynload::ncclAllReduce( @@ -214,12 +222,13 @@ std::shared_ptr ProcessGroupNCCL::AllToAll( // NOTE: Since `all_to_all` needs other processes's participation, it cannot // simply be covered by static checks. Factors are set to 0 here to skip the // shape check. Its shape check will be done by dynamic checks in debug mode. - StaticCheckTensors(*out_tensor, - in_tensor, - rank_, - size_, - /*out_size_factor*/ 0, - /*in_size_factor*/ 0); + CommStaticCheck::CheckShape(*out_tensor, + in_tensor, + /*dst_rank*/ rank_, + /*cur_rank*/ rank_, + size_, + /*out_size_factor*/ 0, + /*in_size_factor*/ 0); return RunFnInNCCLEnv( [&](ncclComm_t comm, gpuStream_t stream) { int64_t in_row_size = in_tensor.numel() / in_dim[0], @@ -287,7 +296,11 @@ std::shared_ptr ProcessGroupNCCL::Broadcast( const BroadcastOptions& opts, bool sync_op, bool use_calc_stream) { - StaticCheckTensorsSameShape(*out_tensor, in_tensor, rank_, size_); + CommStaticCheck::SameShape(*out_tensor, + in_tensor, + /*dst_rank*/ rank_, + /*cur_rank*/ rank_, + size_); return RunFnInNCCLEnv( [&](ncclComm_t comm, gpuStream_t stream) { int root = opts.source_rank + opts.source_root; @@ -312,7 +325,11 @@ std::shared_ptr ProcessGroupNCCL::Reduce( const ReduceOptions& opts, bool sync_op, bool use_calc_stream) { - StaticCheckTensorsSameShape(*out_tensor, in_tensor, rank_, size_); + CommStaticCheck::SameShape(*out_tensor, + in_tensor, + /*dst_rank*/ opts.root_rank, + /*cur_rank*/ rank_, + size_); return RunFnInNCCLEnv( [&](ncclComm_t comm, gpuStream_t stream) { NCCL_CHECK(platform::dynload::ncclReduce( @@ -337,7 +354,11 @@ std::shared_ptr ProcessGroupNCCL::ReduceScatter( const ReduceScatterOptions& opts, bool sync_op, bool use_calc_stream) { - StaticCheckTensorsScatterLikeShape(*out_tensor, in_tensor, rank_, size_); + CommStaticCheck::ScatterLikeShape(*out_tensor, + in_tensor, + /*dst_rank*/ rank_, + /*cur_rank*/ rank_, + size_); return RunFnInNCCLEnv( [&](ncclComm_t comm, gpuStream_t stream) { NCCL_CHECK(platform::dynload::ncclReduceScatter( @@ -361,7 +382,11 @@ std::shared_ptr ProcessGroupNCCL::Scatter( const ScatterOptions& opts, bool sync_op, bool use_calc_stream) { - StaticCheckTensorsScatterLikeShape(*out_tensor, in_tensor, rank_, size_); + CommStaticCheck::ScatterLikeShape(*out_tensor, + in_tensor, + /*dst_rank*/ opts.root_rank, + /*cur_rank*/ rank_, + size_); return RunFnInNCCLEnv( [&](ncclComm_t comm, gpuStream_t stream) { int64_t numel = in_tensor.numel() / size_; @@ -418,7 +443,7 @@ std::shared_ptr ProcessGroupNCCL::Recv( tensor = &partial_tensor; } - StaticCheckTensor(*tensor, rank_, size_); + CommStaticCheck::SingleTensor(*tensor, rank_, size_); return RunFnInNCCLEnv( [&](ncclComm_t comm, gpuStream_t stream) { NCCL_CHECK(platform::dynload::ncclRecv( @@ -446,7 +471,7 @@ std::shared_ptr ProcessGroupNCCL::Send( const phi::DenseTensor& tensor_maybe_partial = numel > 0 ? GetPartialTensor(tensor, offset, numel) : tensor; - StaticCheckTensor(tensor_maybe_partial, rank_, size_); + CommStaticCheck::SingleTensor(tensor_maybe_partial, rank_, size_); return RunFnInNCCLEnv( [&](ncclComm_t comm, gpuStream_t stream) { NCCL_CHECK(platform::dynload::ncclSend( diff --git a/paddle/fluid/distributed/collective/static_check.cc b/paddle/fluid/distributed/collective/static_check.cc new file mode 100644 index 0000000000..98336db90d --- /dev/null +++ b/paddle/fluid/distributed/collective/static_check.cc @@ -0,0 +1,155 @@ +// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "paddle/fluid/distributed/collective/static_check.h" + +#include "paddle/fluid/platform/enforce.h" +#include "paddle/fluid/platform/place.h" +#include "paddle/phi/core/dense_tensor.h" +#include "paddle/phi/core/errors.h" + +namespace paddle { +namespace distributed { + +void CommStaticCheck::CheckRank(int rank, int world_size) { + PADDLE_ENFORCE_GE(rank, + 0, + phi::errors::InvalidArgument( + "Rank should be greater than or equal to 0.")); + PADDLE_ENFORCE_LT( + rank, + world_size, + phi::errors::InvalidArgument("Rank is out of the process group.")); +} + +void CommStaticCheck::CheckPlace(const phi::DenseTensor& tensor) { + PADDLE_ENFORCE_EQ( + platform::is_gpu_place(tensor.place()), + true, + platform::errors::InvalidArgument("Tensor should be in GPU place.")); +} + +void CommStaticCheck::CheckPlace(const phi::DenseTensor& out_tensor, + const phi::DenseTensor& in_tensor) { + CheckPlace(out_tensor); + CheckPlace(in_tensor); + PADDLE_ENFORCE_EQ( + out_tensor.place(), + in_tensor.place(), + phi::errors::InvalidArgument( + "Input and output tensors should be on the same place.")); +} + +void CommStaticCheck::CheckDataType(const phi::DenseTensor& out_tensor, + const phi::DenseTensor& in_tensor) { + PADDLE_ENFORCE_EQ( + out_tensor.dtype(), + in_tensor.dtype(), + phi::errors::InvalidArgument( + "Input and output tensors should have the same data type.")); +} + +void CommStaticCheck::CheckShape(const phi::DenseTensor& tensor) { + PADDLE_ENFORCE_GT( + tensor.numel(), + 0, + phi::errors::InvalidArgument("Size of tensor should be greater than 0.")); +} + +void CommStaticCheck::CheckShape(const phi::DenseTensor& out_tensor, + const phi::DenseTensor& in_tensor, + int out_size_factor, + int in_size_factor) { + CheckShape(out_tensor); + CheckShape(in_tensor); + int64_t out_size = out_tensor.numel(), in_size = in_tensor.numel(); + PADDLE_ENFORCE_EQ( + out_size * out_size_factor, + in_size * in_size_factor, + phi::errors::InvalidArgument( + "Input and output tensors should have matching sizes.")); +} + +void CommStaticCheck::CheckShape(const phi::DenseTensor& out_tensor, + const phi::DenseTensor& in_tensor, + int dst_rank, + int cur_rank, + int world_size, + int out_size_factor, + int in_size_factor) { + CheckRank(dst_rank, world_size); + CheckRank(cur_rank, world_size); + + CheckPlace(out_tensor, in_tensor); + CheckDataType(out_tensor, in_tensor); + + if (dst_rank == cur_rank) { + CheckShape(out_tensor, in_tensor, out_size_factor, in_size_factor); + } else { + CheckShape(out_tensor); + CheckShape(in_tensor); + } +} + +void CommStaticCheck::SingleTensor(const phi::DenseTensor& tensor, + int rank, + int world_size) { + CheckPlace(tensor); + CheckRank(rank, world_size); +} + +void CommStaticCheck::SameShape(const phi::DenseTensor& out_tensor, + const phi::DenseTensor& in_tensor, + int dst_rank, + int cur_rank, + int world_size) { + CheckShape(out_tensor, + in_tensor, + dst_rank, + cur_rank, + world_size, + /*out_size_factor*/ 1, + /*in_size_factor*/ 1); +} + +void CommStaticCheck::ScatterLikeShape(const phi::DenseTensor& out_tensor, + const phi::DenseTensor& in_tensor, + int dst_rank, + int cur_rank, + int world_size) { + CheckShape(out_tensor, + in_tensor, + dst_rank, + cur_rank, + world_size, + /*out_size_factor*/ world_size, + /*in_size_factor*/ 1); +} + +void CommStaticCheck::GatherLikeShape(const phi::DenseTensor& out_tensor, + const phi::DenseTensor& in_tensor, + int dst_rank, + int cur_rank, + int world_size) { + CheckShape(out_tensor, + in_tensor, + dst_rank, + cur_rank, + world_size, + /*out_size_factor*/ 1, + /*in_size_factor*/ world_size); +} + +} // namespace distributed +} // namespace paddle diff --git a/paddle/fluid/distributed/collective/static_check.h b/paddle/fluid/distributed/collective/static_check.h new file mode 100644 index 0000000000..5dcb17e505 --- /dev/null +++ b/paddle/fluid/distributed/collective/static_check.h @@ -0,0 +1,77 @@ +// Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +// forward declaration to reduce deps +namespace phi { +class DenseTensor; +} + +namespace paddle { +namespace distributed { + +struct CommStaticCheck { + static void CheckRank(int rank, int world_size); + + static void CheckPlace(const phi::DenseTensor& tensor); + + static void CheckPlace(const phi::DenseTensor& out_tensor, + const phi::DenseTensor& in_tensor); + + static void CheckDataType(const phi::DenseTensor& out_tensor, + const phi::DenseTensor& in_tensor); + + static void CheckShape(const phi::DenseTensor& tensor); + + static void CheckShape(const phi::DenseTensor& out_tensor, + const phi::DenseTensor& in_tensor, + int out_size_factor, + int in_size_factor); + + static void CheckShape(const phi::DenseTensor& out_tensor, + const phi::DenseTensor& in_tensor, + int dst_rank, + int cur_rank, + int world_size, + int out_size_factor, + int in_size_factor); + + // for p2p + static void SingleTensor(const phi::DenseTensor& tensor, + int rank, + int world_size); + + // for collective + static void SameShape(const phi::DenseTensor& out_tensor, + const phi::DenseTensor& in_tensor, + int dst_rank, + int cur_rank, + int world_size); + + static void ScatterLikeShape(const phi::DenseTensor& out_tensor, + const phi::DenseTensor& in_tensor, + int dst_rank, + int cur_rank, + int world_size); + + static void GatherLikeShape(const phi::DenseTensor& out_tensor, + const phi::DenseTensor& in_tensor, + int dst_rank, + int cur_rank, + int world_size); +}; + +} // namespace distributed +} // namespace paddle -- GitLab