From 8a12f459b0631577c6532551ac66222a1cfa6ed9 Mon Sep 17 00:00:00 2001 From: lilong12 Date: Thu, 21 Apr 2022 17:44:51 +0800 Subject: [PATCH] [Cherry-pick] fix the bug for nccl barrier and alltoall (#42042) * fix_nccl_barrier (#41970) * be compatible with the old version of alltoall (#42007) Co-authored-by: Baibaifan <39549453+Baibaifan@users.noreply.github.com> --- .../collective/ProcessGroupNCCL.cc | 19 ++----- python/paddle/distributed/collective.py | 16 ++++-- .../tests/unittests/process_group_nccl.py | 50 +++++++++++++++++++ 3 files changed, 65 insertions(+), 20 deletions(-) diff --git a/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc b/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc index 12de7d116e2..86cc5b5db7c 100644 --- a/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc +++ b/paddle/fluid/distributed/collective/ProcessGroupNCCL.cc @@ -353,21 +353,8 @@ std::shared_ptr ProcessGroupNCCL::Broadcast( std::shared_ptr ProcessGroupNCCL::Barrier( const BarrierOptions& opts) { - std::vector places; - - if (!opts.place_ids.empty()) { - for (auto place_id : opts.place_ids) { - places.emplace_back(place_id); - } - } else if (!used_place_ids_.empty()) { - for (auto place_id : used_place_ids_) { - places.emplace_back(place_id); - } - } else { - auto numGPUs = GetSize(); - int place_id = static_cast(rank_ % numGPUs); - places.emplace_back(place_id); - } + // Only support single card single process + std::vector places = {place_}; std::vector barrierTensors; barrierTensors.reserve(places.size()); @@ -375,7 +362,7 @@ std::shared_ptr ProcessGroupNCCL::Barrier( platform::CUDADeviceGuard gpuGuard; for (auto& place : places) { gpuGuard.SetDeviceIndex(place.GetDeviceId()); - auto dt = full({1}, 0, phi::DataType::FLOAT32, phi::GPUPlace()); + auto dt = full({1}, 0, phi::DataType::FLOAT32, place); barrierTensors.push_back( *std::dynamic_pointer_cast(dt.impl())); } diff --git a/python/paddle/distributed/collective.py b/python/paddle/distributed/collective.py index 35ab1193c2b..b92b2a3c15d 100644 --- a/python/paddle/distributed/collective.py +++ b/python/paddle/distributed/collective.py @@ -860,9 +860,12 @@ def all_gather(tensor_list, tensor, group=None, use_calc_stream=True): if in_dygraph_mode(): group = _get_default_group() if group is None else group - tensor_shape = list(tensor.shape) - tensor_shape[0] *= group.nranks - out = paddle.empty(tensor_shape, tensor.dtype) + if len(tensor_list) == 0: + tensor_shape = list(tensor.shape) + tensor_shape[0] *= group.nranks + out = paddle.empty(tensor_shape, tensor.dtype) + else: + out = paddle.concat(tensor_list, axis=0) task = group.process_group.all_gather(tensor, out) task.wait() tensor_list.clear() @@ -1783,7 +1786,12 @@ def alltoall(in_tensor_list, out_tensor_list, group=None, use_calc_stream=True): temp = paddle.concat(in_tensor_list, axis=0) nranks = len(in_tensor_list) if in_dygraph_mode(): - out = paddle.concat(out_tensor_list, axis=0) + if len(out_tensor_list) == 0: + tensor_shape = list(in_tensor_list[0].shape) + tensor_shape[0] *= nranks + out = paddle.empty(tensor_shape, in_tensor_list[0].dtype) + else: + out = paddle.concat(out_tensor_list, axis=0) task = group.process_group.alltoall(temp, out) task.wait() out_tensor_list.clear() diff --git a/python/paddle/fluid/tests/unittests/process_group_nccl.py b/python/paddle/fluid/tests/unittests/process_group_nccl.py index 7ae38b3bbc4..7aa83ad9079 100644 --- a/python/paddle/fluid/tests/unittests/process_group_nccl.py +++ b/python/paddle/fluid/tests/unittests/process_group_nccl.py @@ -185,6 +185,24 @@ class TestProcessGroupFp32(unittest.TestCase): assert np.array_equal(tensor_y, out_2) print("test allgather api ok\n") + if pg.rank() == 0: + task = pg.all_gather(tensor_x, tensor_out) + task.wait() + paddle.device.cuda.synchronize() + # rank 1 + else: + tensor_out_list = [] + task = dist.all_gather( + tensor_out_list, tensor_y, use_calc_stream=False) + paddle.device.cuda.synchronize() + tensor_out = paddle.concat(tensor_out_list) + out_1 = paddle.slice(tensor_out, [0], [0], [out_shape[0] // 2]) + out_2 = paddle.slice(tensor_out, [0], [out_shape[0] // 2], + [out_shape[0]]) + assert np.array_equal(tensor_x, out_1) + assert np.array_equal(tensor_y, out_2) + print("test allgather api2 ok\n") + # test alltoall # rank 0 x = np.random.random(self.shape).astype(self.dtype) @@ -219,6 +237,38 @@ class TestProcessGroupFp32(unittest.TestCase): assert np.array_equal(out2_1, raw_tensor_x_2) print("test alltoall api ok\n") + x = np.random.random(self.shape).astype(self.dtype) + y = np.random.random(self.shape).astype(self.dtype) + out1 = np.random.random(self.shape).astype(self.dtype) + out2 = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + tensor_y = paddle.to_tensor(y) + tensor_out1 = paddle.to_tensor(out1) + tensor_out2 = paddle.to_tensor(out2) + raw_tensor_x_2 = paddle.slice(tensor_x, [0], [self.shape[0] // 2], + [self.shape[0]]) + raw_tensor_y_1 = paddle.slice(tensor_y, [0], [0], + [self.shape[0] // 2]) + if pg.rank() == 0: + task = pg.alltoall(tensor_x, tensor_out1) + task.wait() + # rank 1 + else: + in_1, in_2 = paddle.split(tensor_y, 2) + out_1, out_2 = paddle.split(tensor_out2, 2) + out_tensor_list = [] + task = dist.alltoall([in_1, in_2], out_tensor_list) + paddle.device.cuda.synchronize() + tensor_out2 = paddle.concat(out_tensor_list) + out1_2 = paddle.slice(tensor_out1, [0], [self.shape[0] // 2], + [self.shape[0]]) + out2_1 = paddle.slice(tensor_out2, [0], [0], [self.shape[0] // 2]) + if pg.rank() == 0: + assert np.array_equal(out1_2.numpy(), raw_tensor_y_1.numpy()) + else: + assert np.array_equal(out2_1, raw_tensor_x_2) + print("test alltoall api2 ok\n") + # test Reduce # rank 0 x = np.random.random(self.shape).astype(self.dtype) -- GitLab