From f53e5a04625b55a498ebf5784307ba743634e006 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A7=9C=E6=B0=B8=E4=B9=85?= <34344716+yjjiang11@users.noreply.github.com> Date: Sun, 11 Dec 2022 23:10:06 -0800 Subject: [PATCH] rm multinode eager guard tests (#48766) * rm multinode eager guard tests * remove unwanted tests * reset process_mpi test --- .../mn_dygraph_group_sharded_stage3.py | 4 +- .../multinode/mn_dygraph_sharding_stage2.py | 3 - .../collective/process_group_gloo.py | 294 +++--- .../unittests/collective/process_group_mpi.py | 64 +- .../collective/process_group_nccl.py | 876 +++++++++--------- 5 files changed, 599 insertions(+), 642 deletions(-) diff --git a/python/paddle/fluid/tests/unittests/collective/multinode/mn_dygraph_group_sharded_stage3.py b/python/paddle/fluid/tests/unittests/collective/multinode/mn_dygraph_group_sharded_stage3.py index 72f89eb25f..d08f1727f3 100644 --- a/python/paddle/fluid/tests/unittests/collective/multinode/mn_dygraph_group_sharded_stage3.py +++ b/python/paddle/fluid/tests/unittests/collective/multinode/mn_dygraph_group_sharded_stage3.py @@ -34,7 +34,6 @@ from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_stage3 import from paddle.distributed.fleet.meta_parallel.sharding.group_sharded_utils import ( GroupShardedScaler, ) -from paddle.fluid.framework import _test_eager_guard from paddle.nn import Linear epoch = 10 @@ -331,5 +330,4 @@ def test_stage2_stage3(): if __name__ == '__main__': - with _test_eager_guard(): - test_stage2_stage3() + test_stage2_stage3() diff --git a/python/paddle/fluid/tests/unittests/collective/multinode/mn_dygraph_sharding_stage2.py b/python/paddle/fluid/tests/unittests/collective/multinode/mn_dygraph_sharding_stage2.py index b088bf424b..651b0adf9e 100644 --- a/python/paddle/fluid/tests/unittests/collective/multinode/mn_dygraph_sharding_stage2.py +++ b/python/paddle/fluid/tests/unittests/collective/multinode/mn_dygraph_sharding_stage2.py @@ -29,7 +29,6 @@ from paddle.distributed.fleet.meta_optimizers.dygraph_optimizer.sharding_optimiz from paddle.distributed.fleet.meta_parallel.sharding.sharding_stage2 import ( ShardingStage2, ) -from paddle.fluid.framework import _test_eager_guard from paddle.nn import Linear seed = 2022 @@ -248,7 +247,5 @@ def test_dp_stage2(): if __name__ == '__main__': - with _test_eager_guard(): - pass fleet.init(is_collective=True, strategy=strategy) test_dp_stage2() diff --git a/python/paddle/fluid/tests/unittests/collective/process_group_gloo.py b/python/paddle/fluid/tests/unittests/collective/process_group_gloo.py index ad2c91381a..c54101baec 100644 --- a/python/paddle/fluid/tests/unittests/collective/process_group_gloo.py +++ b/python/paddle/fluid/tests/unittests/collective/process_group_gloo.py @@ -20,7 +20,6 @@ import numpy as np import paddle from paddle.fluid import core from paddle.fluid.dygraph.parallel import ParallelEnv -from paddle.fluid.framework import _test_eager_guard class TestProcessGroupFp32(unittest.TestCase): @@ -35,154 +34,151 @@ class TestProcessGroupFp32(unittest.TestCase): self.shape = (2, 10, 5) def test_create_process_group_gloo(self): - with _test_eager_guard(): - nranks = ParallelEnv().nranks - rank = ParallelEnv().local_rank - is_master = True if rank == 0 else False - store = paddle.fluid.core.TCPStore( - "127.0.0.1", 6272, is_master, nranks, 30 - ) - pg = paddle.fluid.core.ProcessGroupGloo.create(store, rank, nranks) - - # test allreduce sum - # rank 0 - paddle.device.set_device('cpu') - x = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - # rank 1 - y = np.random.random(self.shape).astype(self.dtype) - tensor_y = paddle.to_tensor(y) - - sum_result = x + y - if rank == 0: - task = pg.allreduce(tensor_x) - task.wait() - np.testing.assert_equal(tensor_x, sum_result) - else: - task = pg.allreduce(tensor_y) - task.wait() - np.testing.assert_equal(tensor_y, sum_result) - - print("test allreduce sum api ok") - - # test allreduce max - # rank 0 - x = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - # rank 1 - y = np.random.random(self.shape).astype(self.dtype) - tensor_y = paddle.to_tensor(y) - - max_result = paddle.maximum(tensor_x, tensor_y) - - if rank == 0: - task = pg.allreduce(tensor_x, core.ReduceOp.MAX) - task.wait() - assert np.array_equal(tensor_x, max_result) - else: - task = pg.allreduce(tensor_y, core.ReduceOp.MAX) - task.wait() - assert np.array_equal(tensor_y, max_result) - - print("test allreduce max api ok") - - # test broadcast - # rank 0 - x = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - # rank 1 - y = np.random.random(self.shape).astype(self.dtype) - tensor_y = paddle.to_tensor(y) - - broadcast_result = paddle.assign(tensor_x) - if rank == 0: - task = pg.broadcast(tensor_x, 0) - assert np.array_equal(broadcast_result, tensor_x) - else: - task = pg.broadcast(tensor_y, 0) - assert np.array_equal(broadcast_result, tensor_y) - print("test broadcast api ok") - - # test barrier - # rank 0 - if pg.rank() == 0: - task = pg.barrier() - task.wait() - # rank 1 - else: - task = pg.barrier() - task.wait() - - print("test barrier api ok\n") - - # test allgather - # rank 0 - x = np.random.random(self.shape).astype(self.dtype) - y = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - tensor_y = paddle.to_tensor(y) - out_shape = list(self.shape) - out_shape[0] *= 2 - out = np.random.random(out_shape).astype(self.dtype) - tensor_out = paddle.to_tensor(out) - if pg.rank() == 0: - task = pg.all_gather(tensor_x, tensor_out) - task.wait() - paddle.device.cuda.synchronize() - # rank 1 - else: - task = pg.all_gather(tensor_y, tensor_out) - task.wait() - out_1 = paddle.slice(tensor_out, [0], [0], [out_shape[0] // 2]) - out_2 = paddle.slice( - tensor_out, [0], [out_shape[0] // 2], [out_shape[0]] - ) - assert np.array_equal(tensor_x, out_1) - assert np.array_equal(tensor_y, out_2) - print("test allgather api ok\n") - - # test Reduce - # rank 0 - x = np.random.random(self.shape).astype(self.dtype) - y = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - tensor_y = paddle.to_tensor(y) - sum_result = tensor_x + tensor_y - if pg.rank() == 0: - task = pg.reduce(tensor_x, 0) - task.wait() - # rank 1 - else: - task = pg.reduce(tensor_y, 0) - task.wait() - if pg.rank() == 0: - assert np.array_equal(tensor_x, sum_result) - print("test reduce sum api ok\n") - - # test Scatter - # rank 0 - in_shape = list(self.shape) - in_shape[0] *= 2 - x = np.random.random(in_shape).astype(self.dtype) - y = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - tensor_y = paddle.to_tensor(y) - if pg.rank() == 0: - task = pg.scatter(tensor_x, tensor_y, 0) - task.wait() - # rank 1 - else: - task = pg.scatter(tensor_x, tensor_y, 0) - task.wait() - out1 = paddle.slice(tensor_x, [0], [0], [self.shape[0]]) - out2 = paddle.slice( - tensor_x, [0], [self.shape[0]], [self.shape[0] * 2] - ) - if pg.rank() == 0: - assert np.array_equal(tensor_y, out1) - else: - assert np.array_equal(tensor_y, out2) - print("test scatter api ok\n") + nranks = ParallelEnv().nranks + rank = ParallelEnv().local_rank + is_master = True if rank == 0 else False + store = paddle.fluid.core.TCPStore( + "127.0.0.1", 6272, is_master, nranks, 30 + ) + pg = paddle.fluid.core.ProcessGroupGloo.create(store, rank, nranks) + + # test allreduce sum + # rank 0 + paddle.device.set_device('cpu') + x = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + # rank 1 + y = np.random.random(self.shape).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + sum_result = x + y + if rank == 0: + task = pg.allreduce(tensor_x) + task.wait() + np.testing.assert_equal(tensor_x, sum_result) + else: + task = pg.allreduce(tensor_y) + task.wait() + np.testing.assert_equal(tensor_y, sum_result) + + print("test allreduce sum api ok") + + # test allreduce max + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + # rank 1 + y = np.random.random(self.shape).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + max_result = paddle.maximum(tensor_x, tensor_y) + + if rank == 0: + task = pg.allreduce(tensor_x, core.ReduceOp.MAX) + task.wait() + assert np.array_equal(tensor_x, max_result) + else: + task = pg.allreduce(tensor_y, core.ReduceOp.MAX) + task.wait() + assert np.array_equal(tensor_y, max_result) + + print("test allreduce max api ok") + + # test broadcast + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + # rank 1 + y = np.random.random(self.shape).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + broadcast_result = paddle.assign(tensor_x) + if rank == 0: + task = pg.broadcast(tensor_x, 0) + assert np.array_equal(broadcast_result, tensor_x) + else: + task = pg.broadcast(tensor_y, 0) + assert np.array_equal(broadcast_result, tensor_y) + print("test broadcast api ok") + + # test barrier + # rank 0 + if pg.rank() == 0: + task = pg.barrier() + task.wait() + # rank 1 + else: + task = pg.barrier() + task.wait() + + print("test barrier api ok\n") + + # test allgather + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + y = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + tensor_y = paddle.to_tensor(y) + out_shape = list(self.shape) + out_shape[0] *= 2 + out = np.random.random(out_shape).astype(self.dtype) + tensor_out = paddle.to_tensor(out) + if pg.rank() == 0: + task = pg.all_gather(tensor_x, tensor_out) + task.wait() + paddle.device.cuda.synchronize() + # rank 1 + else: + task = pg.all_gather(tensor_y, tensor_out) + task.wait() + out_1 = paddle.slice(tensor_out, [0], [0], [out_shape[0] // 2]) + out_2 = paddle.slice( + tensor_out, [0], [out_shape[0] // 2], [out_shape[0]] + ) + assert np.array_equal(tensor_x, out_1) + assert np.array_equal(tensor_y, out_2) + print("test allgather api ok\n") + + # test Reduce + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + y = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + tensor_y = paddle.to_tensor(y) + sum_result = tensor_x + tensor_y + if pg.rank() == 0: + task = pg.reduce(tensor_x, 0) + task.wait() + # rank 1 + else: + task = pg.reduce(tensor_y, 0) + task.wait() + if pg.rank() == 0: + assert np.array_equal(tensor_x, sum_result) + print("test reduce sum api ok\n") + + # test Scatter + # rank 0 + in_shape = list(self.shape) + in_shape[0] *= 2 + x = np.random.random(in_shape).astype(self.dtype) + y = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + tensor_y = paddle.to_tensor(y) + if pg.rank() == 0: + task = pg.scatter(tensor_x, tensor_y, 0) + task.wait() + # rank 1 + else: + task = pg.scatter(tensor_x, tensor_y, 0) + task.wait() + out1 = paddle.slice(tensor_x, [0], [0], [self.shape[0]]) + out2 = paddle.slice(tensor_x, [0], [self.shape[0]], [self.shape[0] * 2]) + if pg.rank() == 0: + assert np.array_equal(tensor_y, out1) + else: + assert np.array_equal(tensor_y, out2) + print("test scatter api ok\n") if __name__ == "__main__": diff --git a/python/paddle/fluid/tests/unittests/collective/process_group_mpi.py b/python/paddle/fluid/tests/unittests/collective/process_group_mpi.py index 2fb9deb110..f2fc9c498b 100644 --- a/python/paddle/fluid/tests/unittests/collective/process_group_mpi.py +++ b/python/paddle/fluid/tests/unittests/collective/process_group_mpi.py @@ -28,7 +28,7 @@ from paddle.distributed.collective import ( _set_group_map_by_name, ) from paddle.fluid import core -from paddle.fluid.framework import _set_expected_place, _test_eager_guard +from paddle.fluid.framework import _set_expected_place ctypes.CDLL("libmpi.so", mode=ctypes.RTLD_GLOBAL) @@ -444,51 +444,49 @@ class TestProcessGroup(unittest.TestCase): self.shape = (2, 10, 5) def test_create_process_group_mpi(self): - with _test_eager_guard(): - group = init_process_group() - pg = group.process_group + group = init_process_group() + pg = group.process_group + # test allreduce sum + test_allreduce_sum(pg, self.shape, self.dtype) - # test allreduce sum - test_allreduce_sum(pg, self.shape, self.dtype) + # test allreduce max + test_allreduce_max(pg, self.shape, self.dtype) - # test allreduce max - test_allreduce_max(pg, self.shape, self.dtype) + # test allreduce min + test_allreduce_min(pg, self.shape, self.dtype) - # test allreduce min - test_allreduce_min(pg, self.shape, self.dtype) + # test allreduce prod + test_allreduce_prod(pg, self.shape, self.dtype) - # test allreduce prod - test_allreduce_prod(pg, self.shape, self.dtype) + # test broadcast + test_broadcast(pg, self.shape, self.dtype) - # test broadcast - test_broadcast(pg, self.shape, self.dtype) + # test barrier + test_barrair(pg) - # test barrier - test_barrair(pg) + # test allgather + test_allgather(pg, self.shape, self.dtype) - # test allgather - test_allgather(pg, self.shape, self.dtype) + # test alltoall + test_all2all(pg, self.shape, self.dtype) - # test alltoall - test_all2all(pg, self.shape, self.dtype) + # test Reduce + test_reduce_sum(pg, self.shape, self.dtype) - # test Reduce - test_reduce_sum(pg, self.shape, self.dtype) + # test reduce max + test_reduce_max(pg, self.shape, self.dtype) - # test reduce max - test_reduce_max(pg, self.shape, self.dtype) + # test reduce min + test_reduce_min(pg, self.shape, self.dtype) - # test reduce min - test_reduce_min(pg, self.shape, self.dtype) + # test reduce product + test_reduce_prod(pg, self.shape, self.dtype) - # test reduce product - test_reduce_prod(pg, self.shape, self.dtype) + # test Scatter + test_scatter(pg, self.shape, self.dtype) - # test Scatter - test_scatter(pg, self.shape, self.dtype) - - # test send recv. - test_send_recv(pg, group, self.shape, self.dtype) + # test send recv. + test_send_recv(pg, group, self.shape, self.dtype) if __name__ == "__main__": diff --git a/python/paddle/fluid/tests/unittests/collective/process_group_nccl.py b/python/paddle/fluid/tests/unittests/collective/process_group_nccl.py index 2e585e1cc6..ba4ec91a80 100644 --- a/python/paddle/fluid/tests/unittests/collective/process_group_nccl.py +++ b/python/paddle/fluid/tests/unittests/collective/process_group_nccl.py @@ -20,7 +20,6 @@ import numpy as np import paddle import paddle.distributed as dist from paddle.fluid.dygraph.parallel import ParallelEnv -from paddle.fluid.framework import _test_eager_guard def init_process_group(strategy=None): @@ -44,459 +43,428 @@ class TestProcessGroupFp32(unittest.TestCase): self.shape = (2, 10, 5) def test_create_process_group_nccl(self): - with _test_eager_guard(): - device_id = paddle.distributed.ParallelEnv().dev_id - paddle.set_device('gpu:%d' % device_id) - - pg = init_process_group() - print("rank:", pg.rank(), "size:", pg.size(), "name:", pg.name()) - print("test new group api ok") - - # test allreduce sum - # rank 0 - x = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - # rank 1 - y = np.random.random(self.shape).astype(self.dtype) - tensor_y = paddle.to_tensor(y) - - sum_result = tensor_x + tensor_y - if pg.rank() == 0: - task = dist.all_reduce(tensor_x) - assert np.array_equal(tensor_x, sum_result) - else: - task = dist.all_reduce(tensor_y) - assert np.array_equal(tensor_y, sum_result) - - print("test allreduce sum api ok") - - # test allreduce max - # rank 0 - x = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - # rank 1 - y = np.random.random(self.shape).astype(self.dtype) - tensor_y = paddle.to_tensor(y) - - max_result = paddle.maximum(tensor_x, tensor_y) - - if pg.rank() == 0: - task = dist.all_reduce( - tensor_x, dist.ReduceOp.MAX, sync_op=False - ) - task.wait() - assert np.array_equal(tensor_x, max_result) - else: - task = dist.all_reduce( - tensor_y, dist.ReduceOp.MAX, sync_op=False - ) - task.wait() - assert np.array_equal(tensor_y, max_result) - - print("test allreduce max api ok") - - # test allreduce min - # rank 0 - x = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - # rank 1 - y = np.random.random(self.shape).astype(self.dtype) - tensor_y = paddle.to_tensor(y) - - min_result = paddle.minimum(tensor_x, tensor_y) - - if pg.rank() == 0: - task = dist.all_reduce( - tensor_x, dist.ReduceOp.MIN, sync_op=False - ) - task.wait() - assert np.array_equal(tensor_x, min_result) - else: - task = dist.all_reduce( - tensor_y, dist.ReduceOp.MIN, sync_op=False - ) - task.wait() - assert np.array_equal(tensor_y, min_result) - - print("test allreduce min api ok") - - # test allreduce prod - # rank 0 - x = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - # rank 1 - y = np.random.random(self.shape).astype(self.dtype) - tensor_y = paddle.to_tensor(y) - - prod_result = np.multiply(x, y) - - if pg.rank() == 0: - task = dist.all_reduce( - tensor_x, dist.ReduceOp.PROD, sync_op=False - ) - task.wait() - assert np.array_equal(tensor_x, prod_result) - else: - task = dist.all_reduce( - tensor_y, dist.ReduceOp.PROD, sync_op=False - ) - task.wait() - assert np.array_equal(tensor_y, prod_result) - - print("test allreduce prod api ok") - - # test broadcast - # rank 0 - x = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - # rank 1 - y = np.random.random(self.shape).astype(self.dtype) - tensor_y = paddle.to_tensor(y) - - broadcast_result = paddle.assign(tensor_x) - if pg.rank() == 0: - task = dist.broadcast(tensor_x, 0, sync_op=False) - task.synchronize() - paddle.device.cuda.synchronize() - assert task.is_completed() - assert np.array_equal(broadcast_result, tensor_x) - else: - task = dist.broadcast(tensor_y, 0) - paddle.device.cuda.synchronize() - assert np.array_equal(broadcast_result, tensor_y) - - print("test broadcast api ok") - - # test broadcast with shape=[] - # rank 0 - x = np.random.random([]).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - # rank 1 - y = np.random.random([]).astype(self.dtype) - tensor_y = paddle.to_tensor(y) - - broadcast_result = paddle.assign(tensor_x) - if pg.rank() == 0: - task = dist.broadcast(tensor_x, 0, sync_op=False) - task.synchronize() - paddle.device.cuda.synchronize() - assert task.is_completed() - assert np.array_equal(broadcast_result, tensor_x) - else: - task = dist.broadcast(tensor_y, 0) - paddle.device.cuda.synchronize() - assert np.array_equal(broadcast_result, tensor_y) - assert tensor_y.shape == [] - - print("test broadcast api with shape=[] ok") - - # test barrier - # rank 0 - if pg.rank() == 0: - pg.barrier(device_id) - # rank 1 - else: - task = pg.barrier(device_id) - task.wait() - - print("test barrier api ok\n") - - # test allgather - # rank 0 - x = np.random.random(self.shape).astype(self.dtype) - y = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - tensor_y = paddle.to_tensor(y) - out_shape = list(self.shape) - out_shape[0] *= 2 - out = np.random.random(out_shape).astype(self.dtype) - tensor_out = paddle.to_tensor(out) - if pg.rank() == 0: - task = pg.all_gather(tensor_x, tensor_out) - task.wait() - paddle.device.cuda.synchronize() - # rank 1 - else: - tensor_out_list = [ - paddle.empty_like(tensor_x), - paddle.empty_like(tensor_x), - ] - task = dist.all_gather(tensor_out_list, tensor_y, sync_op=False) - paddle.device.cuda.synchronize() - tensor_out = paddle.concat(tensor_out_list) - out_1 = paddle.slice(tensor_out, [0], [0], [out_shape[0] // 2]) - out_2 = paddle.slice( - tensor_out, [0], [out_shape[0] // 2], [out_shape[0]] - ) - assert np.array_equal(tensor_x, out_1) - assert np.array_equal(tensor_y, out_2) - print("test allgather api ok\n") - - if pg.rank() == 0: - task = pg.all_gather(tensor_x, tensor_out) - task.wait() - paddle.device.cuda.synchronize() - # rank 1 - else: - tensor_out_list = [] - task = dist.all_gather(tensor_out_list, tensor_y, sync_op=False) - paddle.device.cuda.synchronize() - tensor_out = paddle.concat(tensor_out_list) - out_1 = paddle.slice(tensor_out, [0], [0], [out_shape[0] // 2]) - out_2 = paddle.slice( - tensor_out, [0], [out_shape[0] // 2], [out_shape[0]] - ) - assert np.array_equal(tensor_x, out_1) - assert np.array_equal(tensor_y, out_2) - print("test allgather api2 ok\n") - - # test alltoall - # rank 0 - x = np.random.random(self.shape).astype(self.dtype) - y = np.random.random(self.shape).astype(self.dtype) - out1 = np.random.random(self.shape).astype(self.dtype) - out2 = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - tensor_y = paddle.to_tensor(y) - tensor_out1 = paddle.to_tensor(out1) - tensor_out2 = paddle.to_tensor(out2) - raw_tensor_x_2 = paddle.slice( - tensor_x, [0], [self.shape[0] // 2], [self.shape[0]] - ) - raw_tensor_y_1 = paddle.slice( - tensor_y, [0], [0], [self.shape[0] // 2] - ) - if pg.rank() == 0: - task = pg.alltoall(tensor_x, tensor_out1) - task.wait() - # rank 1 - else: - in_1, in_2 = paddle.split(tensor_y, 2) - out_1, out_2 = paddle.split(tensor_out2, 2) - out_tensor_list = [out_1, out_2] - task = dist.alltoall([in_1, in_2], out_tensor_list) - paddle.device.cuda.synchronize() - tensor_out2 = paddle.concat(out_tensor_list) - out1_2 = paddle.slice( - tensor_out1, [0], [self.shape[0] // 2], [self.shape[0]] - ) - out2_1 = paddle.slice(tensor_out2, [0], [0], [self.shape[0] // 2]) - if pg.rank() == 0: - assert np.array_equal(out1_2.numpy(), raw_tensor_y_1.numpy()) - else: - assert np.array_equal(out2_1, raw_tensor_x_2) - print("test alltoall api ok\n") - - x = np.random.random(self.shape).astype(self.dtype) - y = np.random.random(self.shape).astype(self.dtype) - out1 = np.random.random(self.shape).astype(self.dtype) - out2 = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - tensor_y = paddle.to_tensor(y) - tensor_out1 = paddle.to_tensor(out1) - tensor_out2 = paddle.to_tensor(out2) - raw_tensor_x_2 = paddle.slice( - tensor_x, [0], [self.shape[0] // 2], [self.shape[0]] - ) - raw_tensor_y_1 = paddle.slice( - tensor_y, [0], [0], [self.shape[0] // 2] - ) - if pg.rank() == 0: - task = pg.alltoall(tensor_x, tensor_out1) - task.wait() - # rank 1 - else: - in_1, in_2 = paddle.split(tensor_y, 2) - out_1, out_2 = paddle.split(tensor_out2, 2) - out_tensor_list = [] - task = dist.alltoall([in_1, in_2], out_tensor_list) - paddle.device.cuda.synchronize() - tensor_out2 = paddle.concat(out_tensor_list) - out1_2 = paddle.slice( - tensor_out1, [0], [self.shape[0] // 2], [self.shape[0]] - ) - out2_1 = paddle.slice(tensor_out2, [0], [0], [self.shape[0] // 2]) - if pg.rank() == 0: - assert np.array_equal(out1_2.numpy(), raw_tensor_y_1.numpy()) - else: - assert np.array_equal(out2_1, raw_tensor_x_2) - print("test alltoall api2 ok\n") - - # test Reduce - # rank 0 - x = np.random.random(self.shape).astype(self.dtype) - y = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - tensor_y = paddle.to_tensor(y) - sum_result = tensor_x + tensor_y - if pg.rank() == 0: - task = dist.reduce(tensor_x, 0, sync_op=True) - paddle.device.cuda.synchronize() - # rank 1 - else: - task = dist.reduce(tensor_y, 0, sync_op=False) - task.wait() - paddle.device.cuda.synchronize() - if pg.rank() == 0: - assert np.array_equal(tensor_x, sum_result) - print("test reduce sum api ok\n") - - # test reduce max - # rank 0 - x = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - # rank 1 - y = np.random.random(self.shape).astype(self.dtype) - tensor_y = paddle.to_tensor(y) - - max_result = paddle.maximum(tensor_x, tensor_y) - - if pg.rank() == 0: - task = dist.reduce( - tensor_x, 0, dist.ReduceOp.MAX, sync_op=False - ) - task.wait() - assert np.array_equal(tensor_x, max_result) - else: - task = dist.reduce( - tensor_y, 0, dist.ReduceOp.MAX, sync_op=False - ) - task.wait() - - print("test reduce max api ok") - - # test reduce min - # rank 0 - x = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - # rank 1 - y = np.random.random(self.shape).astype(self.dtype) - tensor_y = paddle.to_tensor(y) - - min_result = paddle.minimum(tensor_x, tensor_y) - - if pg.rank() == 0: - task = dist.reduce( - tensor_x, 0, dist.ReduceOp.MIN, sync_op=False - ) - task.wait() - assert np.array_equal(tensor_x, min_result) - else: - task = dist.reduce( - tensor_y, 0, dist.ReduceOp.MIN, sync_op=False - ) - task.wait() - - print("test reduce min api ok") - - # test reduce product - # rank 0 - x = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - # rank 1 - y = np.random.random(self.shape).astype(self.dtype) - tensor_y = paddle.to_tensor(y) - - prod_result = np.multiply(x, y) - - if pg.rank() == 0: - task = dist.reduce( - tensor_x, 0, dist.ReduceOp.PROD, sync_op=False - ) - task.wait() - assert np.array_equal(tensor_x, prod_result) - else: - task = dist.reduce( - tensor_y, 0, dist.ReduceOp.PROD, sync_op=False - ) - task.wait() - - print("test reduce prod api ok") - # test Scatter - # rank 0 - in_shape = list(self.shape) - in_shape[0] *= 2 - x = np.random.random(in_shape).astype(self.dtype) - y = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - tensor_y = paddle.to_tensor(y) - if pg.rank() == 0: - in_1, in_2 = paddle.split(tensor_x, 2) - task = dist.scatter(tensor_y, [in_1, in_2], 0, sync_op=True) - # task.wait() - paddle.device.cuda.synchronize() - # rank 1 - else: - task = dist.scatter(tensor_y, [], 0, sync_op=False) - task.wait() - paddle.device.cuda.synchronize() - out1 = paddle.slice(tensor_x, [0], [0], [self.shape[0]]) - out2 = paddle.slice( - tensor_x, [0], [self.shape[0]], [self.shape[0] * 2] - ) - if pg.rank() == 0: - assert np.array_equal(tensor_y, out1) - else: - assert np.array_equal(tensor_y, out2) - print("test scatter api ok\n") - - # test Scatter with shape=[] - # rank 0 - x = np.random.random([]).astype(self.dtype) - y = np.random.random([]).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - tensor_y = paddle.to_tensor(y) - if pg.rank() == 0: - in_1, in_2 = tensor_x, tensor_x + 1 - task = dist.scatter(tensor_y, [in_1, in_2], 0, sync_op=True) - paddle.device.cuda.synchronize() - # rank 1 - else: - task = dist.scatter(tensor_y, [], 0, sync_op=True) - task.wait() - paddle.device.cuda.synchronize() - out1 = paddle.assign(tensor_x) - out2 = paddle.assign(tensor_x + 1) - if pg.rank() == 0: - assert np.array_equal(tensor_y, out1) - else: - assert np.array_equal(tensor_y, out2), f"{tensor_y}, {out2}" - assert tensor_y.shape == [] - print("test scatter api with shape=[] ok\n") - - # test send min - # rank 0 - x = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - # rank 1 - y = np.random.random(self.shape).astype(self.dtype) - tensor_y = paddle.to_tensor(y) - - if pg.rank() == 0: - task = dist.send(tensor_x, 1, sync_op=False) - task.wait() - else: - task = dist.recv(tensor_y, 0, sync_op=False) - task.wait() - assert np.array_equal(tensor_y, tensor_x) - - print("test send api ok") - - # test send min - # rank 0 - x = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - # rank 1 - y = np.random.random(self.shape).astype(self.dtype) - tensor_y = paddle.to_tensor(y) - - if pg.rank() == 0: - task = dist.send(tensor_x, 1, sync_op=True) - else: - task = dist.recv(tensor_y, 0, sync_op=True) - assert np.array_equal(tensor_y, tensor_x) - - print("test send api ok") + device_id = paddle.distributed.ParallelEnv().dev_id + paddle.set_device('gpu:%d' % device_id) + + pg = init_process_group() + print("rank:", pg.rank(), "size:", pg.size(), "name:", pg.name()) + print("test new group api ok") + + # test allreduce sum + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + # rank 1 + y = np.random.random(self.shape).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + sum_result = tensor_x + tensor_y + if pg.rank() == 0: + task = dist.all_reduce(tensor_x) + assert np.array_equal(tensor_x, sum_result) + else: + task = dist.all_reduce(tensor_y) + assert np.array_equal(tensor_y, sum_result) + + print("test allreduce sum api ok") + + # test allreduce max + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + # rank 1 + y = np.random.random(self.shape).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + max_result = paddle.maximum(tensor_x, tensor_y) + + if pg.rank() == 0: + task = dist.all_reduce(tensor_x, dist.ReduceOp.MAX, sync_op=False) + task.wait() + assert np.array_equal(tensor_x, max_result) + else: + task = dist.all_reduce(tensor_y, dist.ReduceOp.MAX, sync_op=False) + task.wait() + assert np.array_equal(tensor_y, max_result) + + print("test allreduce max api ok") + + # test allreduce min + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + # rank 1 + y = np.random.random(self.shape).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + min_result = paddle.minimum(tensor_x, tensor_y) + + if pg.rank() == 0: + task = dist.all_reduce(tensor_x, dist.ReduceOp.MIN, sync_op=False) + task.wait() + assert np.array_equal(tensor_x, min_result) + else: + task = dist.all_reduce(tensor_y, dist.ReduceOp.MIN, sync_op=False) + task.wait() + assert np.array_equal(tensor_y, min_result) + + print("test allreduce min api ok") + + # test allreduce prod + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + # rank 1 + y = np.random.random(self.shape).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + prod_result = np.multiply(x, y) + + if pg.rank() == 0: + task = dist.all_reduce(tensor_x, dist.ReduceOp.PROD, sync_op=False) + task.wait() + assert np.array_equal(tensor_x, prod_result) + else: + task = dist.all_reduce(tensor_y, dist.ReduceOp.PROD, sync_op=False) + task.wait() + assert np.array_equal(tensor_y, prod_result) + + print("test allreduce prod api ok") + + # test broadcast + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + # rank 1 + y = np.random.random(self.shape).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + broadcast_result = paddle.assign(tensor_x) + if pg.rank() == 0: + task = dist.broadcast(tensor_x, 0, sync_op=False) + task.synchronize() + paddle.device.cuda.synchronize() + assert task.is_completed() + assert np.array_equal(broadcast_result, tensor_x) + else: + task = dist.broadcast(tensor_y, 0) + paddle.device.cuda.synchronize() + assert np.array_equal(broadcast_result, tensor_y) + + print("test broadcast api ok") + + # test broadcast with shape=[] + # rank 0 + x = np.random.random([]).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + # rank 1 + y = np.random.random([]).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + broadcast_result = paddle.assign(tensor_x) + if pg.rank() == 0: + task = dist.broadcast(tensor_x, 0, sync_op=False) + task.synchronize() + paddle.device.cuda.synchronize() + assert task.is_completed() + assert np.array_equal(broadcast_result, tensor_x) + else: + task = dist.broadcast(tensor_y, 0) + paddle.device.cuda.synchronize() + assert np.array_equal(broadcast_result, tensor_y) + assert tensor_y.shape == [] + + print("test broadcast api with shape=[] ok") + + # test barrier + # rank 0 + if pg.rank() == 0: + pg.barrier(device_id) + # rank 1 + else: + task = pg.barrier(device_id) + task.wait() + + print("test barrier api ok\n") + + # test allgather + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + y = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + tensor_y = paddle.to_tensor(y) + out_shape = list(self.shape) + out_shape[0] *= 2 + out = np.random.random(out_shape).astype(self.dtype) + tensor_out = paddle.to_tensor(out) + if pg.rank() == 0: + task = pg.all_gather(tensor_x, tensor_out) + task.wait() + paddle.device.cuda.synchronize() + # rank 1 + else: + tensor_out_list = [ + paddle.empty_like(tensor_x), + paddle.empty_like(tensor_x), + ] + task = dist.all_gather(tensor_out_list, tensor_y, sync_op=False) + paddle.device.cuda.synchronize() + tensor_out = paddle.concat(tensor_out_list) + out_1 = paddle.slice(tensor_out, [0], [0], [out_shape[0] // 2]) + out_2 = paddle.slice( + tensor_out, [0], [out_shape[0] // 2], [out_shape[0]] + ) + assert np.array_equal(tensor_x, out_1) + assert np.array_equal(tensor_y, out_2) + print("test allgather api ok\n") + + if pg.rank() == 0: + task = pg.all_gather(tensor_x, tensor_out) + task.wait() + paddle.device.cuda.synchronize() + # rank 1 + else: + tensor_out_list = [] + task = dist.all_gather(tensor_out_list, tensor_y, sync_op=False) + paddle.device.cuda.synchronize() + tensor_out = paddle.concat(tensor_out_list) + out_1 = paddle.slice(tensor_out, [0], [0], [out_shape[0] // 2]) + out_2 = paddle.slice( + tensor_out, [0], [out_shape[0] // 2], [out_shape[0]] + ) + assert np.array_equal(tensor_x, out_1) + assert np.array_equal(tensor_y, out_2) + print("test allgather api2 ok\n") + + # test alltoall + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + y = np.random.random(self.shape).astype(self.dtype) + out1 = np.random.random(self.shape).astype(self.dtype) + out2 = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + tensor_y = paddle.to_tensor(y) + tensor_out1 = paddle.to_tensor(out1) + tensor_out2 = paddle.to_tensor(out2) + raw_tensor_x_2 = paddle.slice( + tensor_x, [0], [self.shape[0] // 2], [self.shape[0]] + ) + raw_tensor_y_1 = paddle.slice(tensor_y, [0], [0], [self.shape[0] // 2]) + if pg.rank() == 0: + task = pg.alltoall(tensor_x, tensor_out1) + task.wait() + # rank 1 + else: + in_1, in_2 = paddle.split(tensor_y, 2) + out_1, out_2 = paddle.split(tensor_out2, 2) + out_tensor_list = [out_1, out_2] + task = dist.alltoall([in_1, in_2], out_tensor_list) + paddle.device.cuda.synchronize() + tensor_out2 = paddle.concat(out_tensor_list) + out1_2 = paddle.slice( + tensor_out1, [0], [self.shape[0] // 2], [self.shape[0]] + ) + out2_1 = paddle.slice(tensor_out2, [0], [0], [self.shape[0] // 2]) + if pg.rank() == 0: + assert np.array_equal(out1_2.numpy(), raw_tensor_y_1.numpy()) + else: + assert np.array_equal(out2_1, raw_tensor_x_2) + print("test alltoall api ok\n") + + x = np.random.random(self.shape).astype(self.dtype) + y = np.random.random(self.shape).astype(self.dtype) + out1 = np.random.random(self.shape).astype(self.dtype) + out2 = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + tensor_y = paddle.to_tensor(y) + tensor_out1 = paddle.to_tensor(out1) + tensor_out2 = paddle.to_tensor(out2) + raw_tensor_x_2 = paddle.slice( + tensor_x, [0], [self.shape[0] // 2], [self.shape[0]] + ) + raw_tensor_y_1 = paddle.slice(tensor_y, [0], [0], [self.shape[0] // 2]) + if pg.rank() == 0: + task = pg.alltoall(tensor_x, tensor_out1) + task.wait() + # rank 1 + else: + in_1, in_2 = paddle.split(tensor_y, 2) + out_1, out_2 = paddle.split(tensor_out2, 2) + out_tensor_list = [] + task = dist.alltoall([in_1, in_2], out_tensor_list) + paddle.device.cuda.synchronize() + tensor_out2 = paddle.concat(out_tensor_list) + out1_2 = paddle.slice( + tensor_out1, [0], [self.shape[0] // 2], [self.shape[0]] + ) + out2_1 = paddle.slice(tensor_out2, [0], [0], [self.shape[0] // 2]) + if pg.rank() == 0: + assert np.array_equal(out1_2.numpy(), raw_tensor_y_1.numpy()) + else: + assert np.array_equal(out2_1, raw_tensor_x_2) + print("test alltoall api2 ok\n") + + # test Reduce + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + y = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + tensor_y = paddle.to_tensor(y) + sum_result = tensor_x + tensor_y + if pg.rank() == 0: + task = dist.reduce(tensor_x, 0, sync_op=True) + paddle.device.cuda.synchronize() + # rank 1 + else: + task = dist.reduce(tensor_y, 0, sync_op=False) + task.wait() + paddle.device.cuda.synchronize() + if pg.rank() == 0: + assert np.array_equal(tensor_x, sum_result) + print("test reduce sum api ok\n") + + # test reduce max + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + # rank 1 + y = np.random.random(self.shape).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + max_result = paddle.maximum(tensor_x, tensor_y) + + if pg.rank() == 0: + task = dist.reduce(tensor_x, 0, dist.ReduceOp.MAX, sync_op=False) + task.wait() + assert np.array_equal(tensor_x, max_result) + else: + task = dist.reduce(tensor_y, 0, dist.ReduceOp.MAX, sync_op=False) + task.wait() + + print("test reduce max api ok") + + # test reduce min + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + # rank 1 + y = np.random.random(self.shape).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + min_result = paddle.minimum(tensor_x, tensor_y) + + if pg.rank() == 0: + task = dist.reduce(tensor_x, 0, dist.ReduceOp.MIN, sync_op=False) + task.wait() + assert np.array_equal(tensor_x, min_result) + else: + task = dist.reduce(tensor_y, 0, dist.ReduceOp.MIN, sync_op=False) + task.wait() + + print("test reduce min api ok") + + # test reduce product + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + # rank 1 + y = np.random.random(self.shape).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + prod_result = np.multiply(x, y) + + if pg.rank() == 0: + task = dist.reduce(tensor_x, 0, dist.ReduceOp.PROD, sync_op=False) + task.wait() + assert np.array_equal(tensor_x, prod_result) + else: + task = dist.reduce(tensor_y, 0, dist.ReduceOp.PROD, sync_op=False) + task.wait() + + print("test reduce prod api ok") + # test Scatter + # rank 0 + in_shape = list(self.shape) + in_shape[0] *= 2 + x = np.random.random(in_shape).astype(self.dtype) + y = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + tensor_y = paddle.to_tensor(y) + if pg.rank() == 0: + in_1, in_2 = paddle.split(tensor_x, 2) + task = dist.scatter(tensor_y, [in_1, in_2], 0, sync_op=True) + # task.wait() + paddle.device.cuda.synchronize() + # rank 1 + else: + task = dist.scatter(tensor_y, [], 0, sync_op=False) + task.wait() + paddle.device.cuda.synchronize() + out1 = paddle.slice(tensor_x, [0], [0], [self.shape[0]]) + out2 = paddle.slice(tensor_x, [0], [self.shape[0]], [self.shape[0] * 2]) + if pg.rank() == 0: + assert np.array_equal(tensor_y, out1) + else: + assert np.array_equal(tensor_y, out2) + print("test scatter api ok\n") + + # test Scatter with shape=[] + # rank 0 + x = np.random.random([]).astype(self.dtype) + y = np.random.random([]).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + tensor_y = paddle.to_tensor(y) + if pg.rank() == 0: + in_1, in_2 = tensor_x, tensor_x + 1 + task = dist.scatter(tensor_y, [in_1, in_2], 0, sync_op=True) + paddle.device.cuda.synchronize() + # rank 1 + else: + task = dist.scatter(tensor_y, [], 0, sync_op=True) + task.wait() + paddle.device.cuda.synchronize() + out1 = paddle.assign(tensor_x) + out2 = paddle.assign(tensor_x + 1) + if pg.rank() == 0: + assert np.array_equal(tensor_y, out1) + else: + assert np.array_equal(tensor_y, out2), f"{tensor_y}, {out2}" + assert tensor_y.shape == [] + print("test scatter api with shape=[] ok\n") + + # test send min + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + # rank 1 + y = np.random.random(self.shape).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + if pg.rank() == 0: + task = dist.send(tensor_x, 1, sync_op=False) + task.wait() + else: + task = dist.recv(tensor_y, 0, sync_op=False) + task.wait() + assert np.array_equal(tensor_y, tensor_x) + + print("test send api ok") + + # test send min + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + # rank 1 + y = np.random.random(self.shape).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + if pg.rank() == 0: + task = dist.send(tensor_x, 1, sync_op=True) + else: + task = dist.recv(tensor_y, 0, sync_op=True) + assert np.array_equal(tensor_y, tensor_x) + + print("test send api ok") class TestProcessGroupFp16(TestProcessGroupFp32): -- GitLab