From 22ec915c3b32989cca021e8fe06aee7be7b1ee96 Mon Sep 17 00:00:00 2001 From: Roc <30228238+sljlp@users.noreply.github.com> Date: Mon, 5 Dec 2022 19:51:39 +0800 Subject: [PATCH] [0D Tensor]support 0d tensor for dist.scatter and dist.broadcast (#48638) --- paddle/phi/infermeta/multiary.cc | 3 +- paddle/phi/kernels/funcs/concat_funcs.h | 3 +- paddle/phi/kernels/gpu/concat_kernel.cu | 29 ++++++++++++ .../collective/process_group_nccl.py | 47 +++++++++++++++++++ 4 files changed, 80 insertions(+), 2 deletions(-) diff --git a/paddle/phi/infermeta/multiary.cc b/paddle/phi/infermeta/multiary.cc index 29c2436c8e8..70111217ad9 100644 --- a/paddle/phi/infermeta/multiary.cc +++ b/paddle/phi/infermeta/multiary.cc @@ -911,13 +911,14 @@ void ConcatInferMeta(const std::vector& x, // 1. calculate axis int rank = x.at(0)->dims().size(); PADDLE_ENFORCE_EQ( - axis >= -rank && axis < rank, + !rank || (axis >= -rank && axis < rank), true, phi::errors::InvalidArgument( "The axis is expected to be in range of [%d, %d), but got %d", -rank, rank, axis)); + axis = rank ? axis : 0; if (axis < 0) { axis = axis + rank; } diff --git a/paddle/phi/kernels/funcs/concat_funcs.h b/paddle/phi/kernels/funcs/concat_funcs.h index db965c2ef9b..61a0e6ad7e5 100644 --- a/paddle/phi/kernels/funcs/concat_funcs.h +++ b/paddle/phi/kernels/funcs/concat_funcs.h @@ -21,13 +21,14 @@ namespace funcs { static inline int64_t ComputeAxis(int64_t axis, int64_t rank) { PADDLE_ENFORCE_EQ( - axis >= -rank && axis < rank, + !rank || (axis >= -rank && axis < rank), true, phi::errors::InvalidArgument( "The axis is expected to be in range of [%d, %d), but got %d", -rank, rank, axis)); + axis = rank ? axis : 0; if (axis < 0) { axis = axis + rank; } diff --git a/paddle/phi/kernels/gpu/concat_kernel.cu b/paddle/phi/kernels/gpu/concat_kernel.cu index 80ff71b2158..497f78ca9fc 100644 --- a/paddle/phi/kernels/gpu/concat_kernel.cu +++ b/paddle/phi/kernels/gpu/concat_kernel.cu @@ -34,6 +34,35 @@ void ConcatKernel(const Context& dev_ctx, DenseTensor* out) { int64_t axis = axis_scalar.to(); + if (UNLIKELY(x[0]->dims().size() == 0)) { + // for dims is 0 specially + phi::DDim tmp_1dim, out_dims; + out_dims[0] = x.size(); + tmp_1dim[0] = 1; + + out->Resize(out_dims); + dev_ctx.template Alloc(out); + + size_t output_offset = 0; + for (auto* in : x) { + if (in->numel() == 0UL) { + continue; + } + auto in_stride = phi::stride_numel(tmp_1dim); + auto out_stride = phi::stride_numel(out->dims()); + paddle::operators::StridedNumelCopyWithAxis( + dev_ctx, + axis, + out->data() + output_offset, + out_stride, + in->data(), + in_stride, + in_stride[axis]); + output_offset += in_stride[axis]; + } + return; + } + axis = phi::funcs::ComputeAxis(axis, x[0]->dims().size()); std::vector x_dims; diff --git a/python/paddle/fluid/tests/unittests/collective/process_group_nccl.py b/python/paddle/fluid/tests/unittests/collective/process_group_nccl.py index 4bee60642b5..2e585e1cc6d 100644 --- a/python/paddle/fluid/tests/unittests/collective/process_group_nccl.py +++ b/python/paddle/fluid/tests/unittests/collective/process_group_nccl.py @@ -167,6 +167,29 @@ class TestProcessGroupFp32(unittest.TestCase): print("test broadcast api ok") + # test broadcast with shape=[] + # rank 0 + x = np.random.random([]).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + # rank 1 + y = np.random.random([]).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + broadcast_result = paddle.assign(tensor_x) + if pg.rank() == 0: + task = dist.broadcast(tensor_x, 0, sync_op=False) + task.synchronize() + paddle.device.cuda.synchronize() + assert task.is_completed() + assert np.array_equal(broadcast_result, tensor_x) + else: + task = dist.broadcast(tensor_y, 0) + paddle.device.cuda.synchronize() + assert np.array_equal(broadcast_result, tensor_y) + assert tensor_y.shape == [] + + print("test broadcast api with shape=[] ok") + # test barrier # rank 0 if pg.rank() == 0: @@ -417,6 +440,30 @@ class TestProcessGroupFp32(unittest.TestCase): assert np.array_equal(tensor_y, out2) print("test scatter api ok\n") + # test Scatter with shape=[] + # rank 0 + x = np.random.random([]).astype(self.dtype) + y = np.random.random([]).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + tensor_y = paddle.to_tensor(y) + if pg.rank() == 0: + in_1, in_2 = tensor_x, tensor_x + 1 + task = dist.scatter(tensor_y, [in_1, in_2], 0, sync_op=True) + paddle.device.cuda.synchronize() + # rank 1 + else: + task = dist.scatter(tensor_y, [], 0, sync_op=True) + task.wait() + paddle.device.cuda.synchronize() + out1 = paddle.assign(tensor_x) + out2 = paddle.assign(tensor_x + 1) + if pg.rank() == 0: + assert np.array_equal(tensor_y, out1) + else: + assert np.array_equal(tensor_y, out2), f"{tensor_y}, {out2}" + assert tensor_y.shape == [] + print("test scatter api with shape=[] ok\n") + # test send min # rank 0 x = np.random.random(self.shape).astype(self.dtype) -- GitLab