From d828ca460a89c2ce88be15bb5cdb76c676decf91 Mon Sep 17 00:00:00 2001 From: Wen Sun <35923278+HermitSun@users.noreply.github.com> Date: Wed, 23 Nov 2022 21:27:06 +0800 Subject: [PATCH] Add static checks for collective communication on NCCL (#48256) * feat: static check --- .../fluid/distributed/collective/NCCLTools.cc | 104 ++++++++++++++++++ .../fluid/distributed/collective/NCCLTools.h | 27 +++++ .../collective/ProcessGroupNCCL.cc | 21 ++++ .../distributed/collective/ProcessGroupNCCL.h | 5 +- paddle/fluid/distributed/collective/utils.h | 2 +- .../communication/stream/all_gather.py | 23 ---- .../communication/stream/all_to_all.py | 24 ---- .../communication/stream/reduce_scatter.py | 25 ----- .../communication/stream/scatter.py | 22 ---- 9 files changed, 156 insertions(+), 97 deletions(-) diff --git a/paddle/fluid/distributed/collective/NCCLTools.cc b/paddle/fluid/distributed/collective/NCCLTools.cc index a8c437bb122..988232b6171 100644 --- a/paddle/fluid/distributed/collective/NCCLTools.cc +++ b/paddle/fluid/distributed/collective/NCCLTools.cc @@ -44,5 +44,109 @@ std::string SerializeNCCLUniqueId(const ncclUniqueId& ncclID) { return oss.str(); } +void StaticCheckTensor(const phi::DenseTensor& tensor, + int rank, + int world_size) { + // place check + PADDLE_ENFORCE_EQ( + platform::is_gpu_place(tensor.place()), + true, + platform::errors::InvalidArgument("Tensor should be in GPU place.")); + // rank check + PADDLE_ENFORCE_GE(rank, + 0, + platform::errors::InvalidArgument( + "Rank should be greater than or equal to 0.")); + PADDLE_ENFORCE_LT( + rank, + world_size, + platform::errors::InvalidArgument("Rank is out of the process group.")); +} + +// static check for collective +void StaticCheckTensors(const phi::DenseTensor& out_tensor, + const phi::DenseTensor& in_tensor, + int rank, + int world_size, + int out_size_factor, + int in_size_factor) { + // place check + PADDLE_ENFORCE_EQ(platform::is_gpu_place(out_tensor.place()), + true, + platform::errors::InvalidArgument( + "Output tensor should be in GPU place.")); + PADDLE_ENFORCE_EQ(platform::is_gpu_place(in_tensor.place()), + true, + platform::errors::InvalidArgument( + "Input tensor should be in GPU place.")); + // rank check + PADDLE_ENFORCE_GE(rank, + 0, + platform::errors::InvalidArgument( + "Rank should be greater than or equal to 0.")); + PADDLE_ENFORCE_LT( + rank, + world_size, + platform::errors::InvalidArgument("Rank is out of the process group.")); + // shape check + int64_t out_size = out_tensor.numel(); + PADDLE_ENFORCE_GT(out_size, + 0, + platform::errors::InvalidArgument( + "Size of output tensor should be greater than 0.")); + int64_t in_size = in_tensor.numel(); + PADDLE_ENFORCE_GT(in_size, + 0, + platform::errors::InvalidArgument( + "Size of input tensor should be greater than 0.")); + PADDLE_ENFORCE_EQ( + out_size * out_size_factor, + in_size * in_size_factor, + platform::errors::InvalidArgument( + "Input and output tensors should have matching sizes.")); + // dtype check + PADDLE_ENFORCE_EQ( + out_tensor.dtype(), + in_tensor.dtype(), + platform::errors::InvalidArgument( + "Input and output tensors should have the same data type.")); +} + +void StaticCheckTensorsSameShape(const phi::DenseTensor& out_tensor, + const phi::DenseTensor& in_tensor, + int rank, + int world_size) { + StaticCheckTensors(out_tensor, + in_tensor, + rank, + world_size, + /*out_size_factor*/ 1, + /*in_size_factor*/ 1); +} + +void StaticCheckTensorsScatterLikeShape(const phi::DenseTensor& out_tensor, + const phi::DenseTensor& in_tensor, + int rank, + int world_size) { + StaticCheckTensors(out_tensor, + in_tensor, + rank, + world_size, + /*out_size_factor*/ world_size, + /*in_size_factor*/ 1); +} + +void StaticCheckTensorsGatherLikeShape(const phi::DenseTensor& out_tensor, + const phi::DenseTensor& in_tensor, + int rank, + int world_size) { + StaticCheckTensors(out_tensor, + in_tensor, + rank, + world_size, + /*out_size_factor*/ 1, + /*in_size_factor*/ world_size); +} + } // namespace distributed } // namespace paddle diff --git a/paddle/fluid/distributed/collective/NCCLTools.h b/paddle/fluid/distributed/collective/NCCLTools.h index 37b1e0f114c..a882dae2e99 100644 --- a/paddle/fluid/distributed/collective/NCCLTools.h +++ b/paddle/fluid/distributed/collective/NCCLTools.h @@ -63,5 +63,32 @@ ncclRedOp_t ToNCCLRedType(ReduceOp reduction); std::string SerializeNCCLUniqueId(const ncclUniqueId& ncclID); +// static check for p2p +void StaticCheckTensor(const phi::DenseTensor& tensor, + int rank, + int world_size); + +// static check for collective +void StaticCheckTensors(const phi::DenseTensor& out_tensor, + const phi::DenseTensor& in_tensor, + int rank, + int world_size, + int out_size_factor, + int in_size_factor); + +void StaticCheckTensorsSameShape(const phi::DenseTensor& out_tensor, + const phi::DenseTensor& in_tensor, + int rank, + int world_size); + +void StaticCheckTensorsScatterLikeShape(const phi::DenseTensor& out_tensor, + const phi::DenseTensor& in_tensor, + int rank, + int world_size); + +void StaticCheckTensorsGatherLikeShape(const phi::DenseTensor& out_tensor, + const phi::DenseTensor& in_tensor, + int rank, + int world_size); } // namespace distributed } // namespace paddle diff --git a/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc b/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc index 96666f50c91..e995161cf30 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc +++ b/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc @@ -15,6 +15,7 @@ #include "paddle/fluid/distributed/collective/ProcessGroupNCCL.h" #include "paddle/fluid/distributed/collective/Common.h" +#include "paddle/fluid/distributed/collective/NCCLTools.h" #include "paddle/fluid/distributed/collective/utils.h" #include "paddle/fluid/platform/device/gpu/nccl_helper.h" #include "paddle/fluid/platform/place.h" @@ -137,6 +138,8 @@ std::shared_ptr ProcessGroupNCCL::AllGather( // numel > 0 indicates the tensor need to be sliced const phi::DenseTensor& in_tensor_maybe_partial = numel > 0 ? GetPartialTensor(in_tensor, offset, numel) : in_tensor; + StaticCheckTensorsGatherLikeShape( + *out_tensor, in_tensor_maybe_partial, rank_, size_); return RunFnInNCCLEnv( [&](ncclComm_t comm, gpuStream_t stream) { NCCL_CHECK(platform::dynload::ncclAllGather( @@ -159,6 +162,7 @@ std::shared_ptr ProcessGroupNCCL::AllReduce( const AllreduceOptions& opts, bool sync_op, bool use_calc_stream) { + StaticCheckTensorsSameShape(*out_tensor, in_tensor, rank_, size_); return RunFnInNCCLEnv( [&](ncclComm_t comm, gpuStream_t stream) { NCCL_CHECK(platform::dynload::ncclAllReduce( @@ -207,6 +211,15 @@ std::shared_ptr ProcessGroupNCCL::AllToAll( CheckSizeOnEachRank(out_dim, out_size_each_rank, size_); CheckSizeOnEachRank(in_dim, in_size_each_rank, size_); + // NOTE: Since `all_to_all` needs other processes's participation, it cannot + // simply be covered by static checks. Factors are set to 0 here to skip the + // shape check. Its shape check will be done by dynamic checks in debug mode. + StaticCheckTensors(*out_tensor, + in_tensor, + rank_, + size_, + /*out_size_factor*/ 0, + /*in_size_factor*/ 0); return RunFnInNCCLEnv( [&](ncclComm_t comm, gpuStream_t stream) { int64_t in_row_size = in_tensor.numel() / in_dim[0], @@ -274,6 +287,7 @@ std::shared_ptr ProcessGroupNCCL::Broadcast( const BroadcastOptions& opts, bool sync_op, bool use_calc_stream) { + StaticCheckTensorsSameShape(*out_tensor, in_tensor, rank_, size_); return RunFnInNCCLEnv( [&](ncclComm_t comm, gpuStream_t stream) { int root = opts.source_rank + opts.source_root; @@ -298,6 +312,7 @@ std::shared_ptr ProcessGroupNCCL::Reduce( const ReduceOptions& opts, bool sync_op, bool use_calc_stream) { + StaticCheckTensorsSameShape(*out_tensor, in_tensor, rank_, size_); return RunFnInNCCLEnv( [&](ncclComm_t comm, gpuStream_t stream) { NCCL_CHECK(platform::dynload::ncclReduce( @@ -322,6 +337,7 @@ std::shared_ptr ProcessGroupNCCL::ReduceScatter( const ReduceScatterOptions& opts, bool sync_op, bool use_calc_stream) { + StaticCheckTensorsScatterLikeShape(*out_tensor, in_tensor, rank_, size_); return RunFnInNCCLEnv( [&](ncclComm_t comm, gpuStream_t stream) { NCCL_CHECK(platform::dynload::ncclReduceScatter( @@ -345,6 +361,7 @@ std::shared_ptr ProcessGroupNCCL::Scatter( const ScatterOptions& opts, bool sync_op, bool use_calc_stream) { + StaticCheckTensorsScatterLikeShape(*out_tensor, in_tensor, rank_, size_); return RunFnInNCCLEnv( [&](ncclComm_t comm, gpuStream_t stream) { int64_t numel = in_tensor.numel() / size_; @@ -400,6 +417,8 @@ std::shared_ptr ProcessGroupNCCL::Recv( partial_tensor = GetPartialTensor(*tensor, offset, numel); tensor = &partial_tensor; } + + StaticCheckTensor(*tensor, rank_, size_); return RunFnInNCCLEnv( [&](ncclComm_t comm, gpuStream_t stream) { NCCL_CHECK(platform::dynload::ncclRecv( @@ -426,6 +445,8 @@ std::shared_ptr ProcessGroupNCCL::Send( // numel > 0 indicates the tensor need to be sliced const phi::DenseTensor& tensor_maybe_partial = numel > 0 ? GetPartialTensor(tensor, offset, numel) : tensor; + + StaticCheckTensor(tensor_maybe_partial, rank_, size_); return RunFnInNCCLEnv( [&](ncclComm_t comm, gpuStream_t stream) { NCCL_CHECK(platform::dynload::ncclSend( diff --git a/paddle/fluid/distributed/collective/ProcessGroupNCCL.h b/paddle/fluid/distributed/collective/ProcessGroupNCCL.h index 5153b7a678d..2a184e182aa 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupNCCL.h +++ b/paddle/fluid/distributed/collective/ProcessGroupNCCL.h @@ -210,6 +210,8 @@ class ProcessGroupNCCL final : public ProcessGroupStream { void CreateNCCLEnvCache(const Place& place, const std::string& place_key); + void SyncCalcStream(const Place& place); + std::shared_ptr RunFnInNCCLEnv( std::function fn, const phi::DenseTensor& tensor, @@ -217,8 +219,6 @@ class ProcessGroupNCCL final : public ProcessGroupStream { bool sync_op, bool use_calc_stream); - void SyncCalcStream(const Place& place); - // TODO(sunyilun): methods below will be removed later std::shared_ptr CreateTask( std::vector places, @@ -245,6 +245,7 @@ class ProcessGroupNCCL final : public ProcessGroupStream { private: std::shared_ptr store_; + std::unordered_map place_to_calc_event_; // event on calc stream std::unordered_map place_to_calc_ctx_; diff --git a/paddle/fluid/distributed/collective/utils.h b/paddle/fluid/distributed/collective/utils.h index a730a47dd0d..5b98a363570 100644 --- a/paddle/fluid/distributed/collective/utils.h +++ b/paddle/fluid/distributed/collective/utils.h @@ -19,7 +19,7 @@ namespace paddle { namespace distributed { -inline phi::DenseTensor GetPartialTensor(const phi::DenseTensor &tensor, +inline phi::DenseTensor GetPartialTensor(const phi::DenseTensor& tensor, int64_t offset, int64_t numel) { phi::DenseTensor tensor_flattened; diff --git a/python/paddle/distributed/communication/stream/all_gather.py b/python/paddle/distributed/communication/stream/all_gather.py index 12f9e08f9d5..1e3344d0dbb 100644 --- a/python/paddle/distributed/communication/stream/all_gather.py +++ b/python/paddle/distributed/communication/stream/all_gather.py @@ -17,32 +17,11 @@ import paddle.fluid.framework as framework from paddle.distributed import collective -def _check_tensor_shape(tensor, shape, nranks=1): - expect_shape = list(shape) - expect_shape[0] *= nranks - if list(tensor.shape) != expect_shape: - raise RuntimeError("The tensor for all_gather is not correctly-sized.") - - -def _check_tensor_list_shape(tensor_list, shape, nranks=1): - if len(tensor_list) != nranks: - raise RuntimeError( - "The tensor_list for all_gather is not correctly-sized." - ) - for tensor in tensor_list: - if tensor.shape != shape: - raise RuntimeError( - "The tensor_list for all_gather is not correctly-sized." - ) - - def _all_gather_into_tensor_in_dygraph( out_tensor, in_tensor, group, sync_op, use_calc_stream ): group = collective._get_default_group() if group is None else group - _check_tensor_shape(out_tensor, in_tensor.shape, group.nranks) - if use_calc_stream: return group.process_group.all_gather_into_tensor_on_calc_stream( out_tensor, @@ -65,8 +44,6 @@ def _all_gather_in_dygraph( if len(tensor_list) == 0: tensor_list += [paddle.empty_like(tensor) for _ in range(group.nranks)] - else: - _check_tensor_list_shape(tensor_list, tensor.shape, group.nranks) if use_calc_stream: return group.process_group.all_gather_on_calc_stream( diff --git a/python/paddle/distributed/communication/stream/all_to_all.py b/python/paddle/distributed/communication/stream/all_to_all.py index 2787c6a3d4d..cc3b473d90d 100644 --- a/python/paddle/distributed/communication/stream/all_to_all.py +++ b/python/paddle/distributed/communication/stream/all_to_all.py @@ -23,29 +23,9 @@ from paddle.distributed.communication.group import ( ) -def _check_tensor_shape(tensor, shape, nranks=1): - if tensor.shape != shape: - raise RuntimeError('The tensor for alltoall is not correctly-sized.') - - -def _check_tensor_list_shape(tensor_list, shape, nranks=1): - if len(tensor_list) != nranks: - raise RuntimeError( - 'The tensor_list for alltoall is not correctly-sized.' - ) - for tensor in tensor_list: - if tensor.shape != shape: - raise RuntimeError( - 'The tensor_list for alltoall is not correctly-sized.' - ) - - def _all_to_all_tensor_in_dygraph( out_tensor, in_tensor, group, sync_op, use_calc_stream ): - - _check_tensor_shape(out_tensor, in_tensor.shape, group.nranks) - if use_calc_stream: return group.process_group.all_to_all_tensor_on_calc_stream( in_tensor, out_tensor @@ -68,10 +48,6 @@ def _all_to_all_in_dygraph( out_tensor_list += [ paddle.empty_like(tensor) for tensor in in_tensor_list ] - else: - _check_tensor_list_shape( - out_tensor_list, in_tensor_list[0].shape, group.nranks - ) if use_calc_stream: return group.process_group.all_to_all_on_calc_stream( diff --git a/python/paddle/distributed/communication/stream/reduce_scatter.py b/python/paddle/distributed/communication/stream/reduce_scatter.py index 4d26e8d2b66..80e1ae7aa21 100644 --- a/python/paddle/distributed/communication/stream/reduce_scatter.py +++ b/python/paddle/distributed/communication/stream/reduce_scatter.py @@ -21,27 +21,6 @@ from paddle.distributed.communication.group import ( from paddle.distributed.communication.reduce import _get_reduce_op, ReduceOp -def _check_tensor_shape(tensor, shape, nranks=1): - expect_shape = list(shape) - expect_shape[0] //= nranks - if list(tensor.shape) != expect_shape: - raise RuntimeError( - "The in_tensor for reduce_scatter is not correctly-sized." - ) - - -def _check_tensor_list_shape(tensor_list, shape, nranks=1): - if len(tensor_list) != nranks: - raise RuntimeError( - "The tensor_list for reduce_scatter is not correctly-sized." - ) - for tensor in tensor_list: - if tensor.shape != shape: - raise RuntimeError( - "The tensor_list for reduce_scatter is not correctly-sized." - ) - - def _reduce_scatter_tensor_in_dygraph( out_tensor, in_tensor, @@ -53,8 +32,6 @@ def _reduce_scatter_tensor_in_dygraph( ): op_type = _get_reduce_op(op, caller) - _check_tensor_shape(out_tensor, in_tensor.shape, group.nranks) - if use_calc_stream: return group.process_group.reduce_scatter_tensor_on_calc_stream( out_tensor, in_tensor, op_type @@ -74,8 +51,6 @@ def _reduce_scatter_in_dygraph( ): op_type = _get_reduce_op(op, "reduce_scatter") - _check_tensor_list_shape(tensor_list, tensor.shape, group.nranks) - if use_calc_stream: return group.process_group.reduce_scatter_on_calc_stream( tensor, tensor_list, op_type diff --git a/python/paddle/distributed/communication/stream/scatter.py b/python/paddle/distributed/communication/stream/scatter.py index 5767c2150d8..a1df9c71aee 100644 --- a/python/paddle/distributed/communication/stream/scatter.py +++ b/python/paddle/distributed/communication/stream/scatter.py @@ -25,31 +25,10 @@ from paddle.distributed.communication.group import ( ) -def _check_tensor_shape(tensor, shape, nranks=1): - expect_shape = list(shape) - expect_shape[0] //= nranks - if list(tensor.shape) != expect_shape: - raise RuntimeError("The in_tensor for scatter is not correctly-sized.") - - -def _check_tensor_list_shape(tensor_list, shape, nranks=1): - if len(tensor_list) != nranks: - raise RuntimeError( - "The tensor_list for scatter is not correctly-sized." - ) - for tensor in tensor_list: - if tensor.shape != shape: - raise RuntimeError( - "The tensor_list for scatter is not correctly-sized." - ) - - def _scatter_tensor_in_dygraph( out_tensor, in_tensor, src_rank_in_group, group, sync_op, use_calc_stream ): nranks = group.nranks - if group.rank == src_rank_in_group: - _check_tensor_shape(out_tensor, in_tensor.shape, nranks) if use_calc_stream: return group.process_group.scatter_tensor_on_calc_stream( @@ -74,7 +53,6 @@ def _scatter_in_dygraph( raise RuntimeError( "The tensor_list should not be empty on src rank." ) - _check_tensor_list_shape(tensor_list, tensor.shape, nranks) else: tensor_list = [tensor for _ in range(nranks)] -- GitLab