diff --git a/python/paddle/fluid/tests/unittests/collective/process_group_nccl.py b/python/paddle/fluid/tests/unittests/collective/process_group_nccl.py index ba4ec91a80403957031e526a7103458801e0d9ad..6fcd90158f5e5cc028227803f0dd47bf6fd84dc1 100644 --- a/python/paddle/fluid/tests/unittests/collective/process_group_nccl.py +++ b/python/paddle/fluid/tests/unittests/collective/process_group_nccl.py @@ -68,6 +68,24 @@ class TestProcessGroupFp32(unittest.TestCase): print("test allreduce sum api ok") + # test allreduce sum with shape = [] + # rank 0 + x = np.random.random([]).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + # rank 1 + y = np.random.random([]).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + sum_result = tensor_x + tensor_y + if pg.rank() == 0: + task = dist.all_reduce(tensor_x) + assert np.array_equal(tensor_x, sum_result) + else: + task = dist.all_reduce(tensor_y) + assert np.array_equal(tensor_y, sum_result) + + print("test allreduce sum api with = [] ok") + # test allreduce max # rank 0 x = np.random.random(self.shape).astype(self.dtype) @@ -89,6 +107,27 @@ class TestProcessGroupFp32(unittest.TestCase): print("test allreduce max api ok") + # test allreduce max with shape = [] + # rank 0 + x = np.random.random([]).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + # rank 1 + y = np.random.random([]).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + max_result = paddle.maximum(tensor_x, tensor_y) + + if pg.rank() == 0: + task = dist.all_reduce(tensor_x, dist.ReduceOp.MAX, sync_op=False) + task.wait() + assert np.array_equal(tensor_x, max_result) + else: + task = dist.all_reduce(tensor_y, dist.ReduceOp.MAX, sync_op=False) + task.wait() + assert np.array_equal(tensor_y, max_result) + + print("test allreduce max api with shape = [] ok") + # test allreduce min # rank 0 x = np.random.random(self.shape).astype(self.dtype) @@ -110,6 +149,27 @@ class TestProcessGroupFp32(unittest.TestCase): print("test allreduce min api ok") + # test allreduce min with shape = [] + # rank 0 + x = np.random.random([]).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + # rank 1 + y = np.random.random([]).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + min_result = paddle.minimum(tensor_x, tensor_y) + + if pg.rank() == 0: + task = dist.all_reduce(tensor_x, dist.ReduceOp.MIN, sync_op=False) + task.wait() + assert np.array_equal(tensor_x, min_result) + else: + task = dist.all_reduce(tensor_y, dist.ReduceOp.MIN, sync_op=False) + task.wait() + assert np.array_equal(tensor_y, min_result) + + print("test allreduce min api with shape [] ok") + # test allreduce prod # rank 0 x = np.random.random(self.shape).astype(self.dtype) @@ -131,6 +191,27 @@ class TestProcessGroupFp32(unittest.TestCase): print("test allreduce prod api ok") + # test allreduce prod with shape = [] + # rank 0 + x = np.random.random([]).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + # rank 1 + y = np.random.random([]).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + prod_result = np.multiply(x, y) + + if pg.rank() == 0: + task = dist.all_reduce(tensor_x, dist.ReduceOp.PROD, sync_op=False) + task.wait() + assert np.array_equal(tensor_x, prod_result) + else: + task = dist.all_reduce(tensor_y, dist.ReduceOp.PROD, sync_op=False) + task.wait() + assert np.array_equal(tensor_y, prod_result) + + print("test allreduce prod api with shape = [] ok") + # test broadcast # rank 0 x = np.random.random(self.shape).astype(self.dtype) @@ -236,6 +317,27 @@ class TestProcessGroupFp32(unittest.TestCase): assert np.array_equal(tensor_y, out_2) print("test allgather api2 ok\n") + # test allgather with shape = [] + # rank 0 + x = np.random.random([]).astype(self.dtype) + y = np.random.random([]).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + tensor_y = paddle.to_tensor(y) + tensor_out_list = [] + if pg.rank() == 0: + task = dist.all_gather(tensor_out_list, tensor_x) + task.wait() + paddle.device.cuda.synchronize() + # rank 1 + else: + task = dist.all_gather(tensor_out_list, tensor_y, sync_op=False) + paddle.device.cuda.synchronize() + out_1 = tensor_out_list[0] + out_2 = tensor_out_list[1] + assert np.array_equal(tensor_x, out_1) + assert np.array_equal(tensor_y, out_2) + print("test allgather api with shape [] ok\n") + # test alltoall # rank 0 x = np.random.random(self.shape).astype(self.dtype)