From e83f5f330f5c097c3c0a613ed55ab56c3202134e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=A7=9C=E6=B0=B8=E4=B9=85?= <34344716+yjjiang11@users.noreply.github.com> Date: Fri, 9 Dec 2022 02:28:23 -0800 Subject: [PATCH] remove xpu eager guard tests (#48786) --- ...el_dygraph_gradient_check_in_eager_mode.py | 92 +++--- .../tests/unittests/xpu/process_group_bkcl.py | 276 +++++++++--------- 2 files changed, 176 insertions(+), 192 deletions(-) diff --git a/python/paddle/fluid/tests/unittests/xpu/parallel_dygraph_gradient_check_in_eager_mode.py b/python/paddle/fluid/tests/unittests/xpu/parallel_dygraph_gradient_check_in_eager_mode.py index 8ae00ca4af4..5ffe41adbeb 100644 --- a/python/paddle/fluid/tests/unittests/xpu/parallel_dygraph_gradient_check_in_eager_mode.py +++ b/python/paddle/fluid/tests/unittests/xpu/parallel_dygraph_gradient_check_in_eager_mode.py @@ -19,7 +19,6 @@ import numpy as np import paddle import paddle.distributed as dist import paddle.fluid as fluid -from paddle.fluid.framework import _test_eager_guard from paddle.nn import Linear paddle.seed(1024) @@ -69,58 +68,57 @@ class SimpleNet(fluid.Layer): class TestDistTraning(unittest.TestCase): def test_multiple_xpus(self): self.trainer_id = dist.get_rank() - with _test_eager_guard(): - self.pg = dist.init_parallel_env() + self.pg = dist.init_parallel_env() - model_a = SimpleNet(self.trainer_id) - model_b = SimpleNet(self.trainer_id) + model_a = SimpleNet(self.trainer_id) + model_b = SimpleNet(self.trainer_id) - state_dict = model_a.state_dict() - model_b.set_state_dict(state_dict) + state_dict = model_a.state_dict() + model_b.set_state_dict(state_dict) - model_a = paddle.DataParallel( - model_a, find_unused_parameters=True, group=self.pg + model_a = paddle.DataParallel( + model_a, find_unused_parameters=True, group=self.pg + ) + model_b = paddle.DataParallel( + model_b, find_unused_parameters=True, group=self.pg + ) + + ones_input = paddle.ones(shape=(batch, in_dim)) + ones_input.stop_gradient = True + + w1_grad_sum = np.zeros((in_dim, out_dim), dtype='float32') + w2_grad_sum = np.zeros((in_dim, out_dim), dtype='float32') + + for step_id in range(5): + random_input = paddle.rand(shape=(batch, in_dim)) + random_input.stop_gradient = True + + if step_id % 2 == 0: + out_a = model_a(random_input) + out_b = model_b(random_input) + else: + out_a = model_a(ones_input) + out_b = model_b(ones_input) + + out_a.sum().backward() + out_b.sum().backward() + + self.check_gradient(model_a.parameters()) + self.check_gradient(model_b.parameters()) + + # test acc gradient + w1_grad_sum = self.check_acc( + model_a._layers.w1.grad, + w1_grad_sum, + model_b._layers.w1.grad, ) - model_b = paddle.DataParallel( - model_b, find_unused_parameters=True, group=self.pg + w2_grad_sum = self.check_acc( + model_a._layers.w2.grad, + w2_grad_sum, + model_b._layers.w2.grad, ) - ones_input = paddle.ones(shape=(batch, in_dim)) - ones_input.stop_gradient = True - - w1_grad_sum = np.zeros((in_dim, out_dim), dtype='float32') - w2_grad_sum = np.zeros((in_dim, out_dim), dtype='float32') - - for step_id in range(5): - random_input = paddle.rand(shape=(batch, in_dim)) - random_input.stop_gradient = True - - if step_id % 2 == 0: - out_a = model_a(random_input) - out_b = model_b(random_input) - else: - out_a = model_a(ones_input) - out_b = model_b(ones_input) - - out_a.sum().backward() - out_b.sum().backward() - - self.check_gradient(model_a.parameters()) - self.check_gradient(model_b.parameters()) - - # test acc gradient - w1_grad_sum = self.check_acc( - model_a._layers.w1.grad, - w1_grad_sum, - model_b._layers.w1.grad, - ) - w2_grad_sum = self.check_acc( - model_a._layers.w2.grad, - w2_grad_sum, - model_b._layers.w2.grad, - ) - - model_a.clear_gradients() + model_a.clear_gradients() def check_acc(self, grad, grad_sum, acc_grad): if grad is not None: diff --git a/python/paddle/fluid/tests/unittests/xpu/process_group_bkcl.py b/python/paddle/fluid/tests/unittests/xpu/process_group_bkcl.py index 0578df80107..ff94376b23d 100644 --- a/python/paddle/fluid/tests/unittests/xpu/process_group_bkcl.py +++ b/python/paddle/fluid/tests/unittests/xpu/process_group_bkcl.py @@ -21,7 +21,6 @@ import numpy as np import paddle import paddle.distributed as dist from paddle.fluid.dygraph.parallel import ParallelEnv -from paddle.fluid.framework import _test_eager_guard def init_process_group(strategy=None): @@ -45,150 +44,137 @@ class TestProcessGroupFp32(unittest.TestCase): self.shape = (2, 10, 5) def test_create_process_group_bkcl(self): - with _test_eager_guard(): - device_id = paddle.distributed.ParallelEnv().dev_id - paddle.set_device('xpu:%d' % device_id) - - pg = init_process_group() - sys.stdout.write( - "rank {}: size {} name {}\n".format( - pg.rank(), pg.size(), pg.name() - ) - ) - sys.stdout.write( - "rank {}: test new group api ok\n".format(pg.rank()) - ) - - # test allreduce sum - # rank 0 - x = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - # rank 1 - y = np.random.random(self.shape).astype(self.dtype) - tensor_y = paddle.to_tensor(y) - - sum_result = tensor_x + tensor_y - if pg.rank() == 0: - task = dist.all_reduce(tensor_x) - assert np.array_equal(tensor_x, sum_result) - else: - task = dist.all_reduce(tensor_y) - assert np.array_equal(tensor_y, sum_result) - - sys.stdout.write( - "rank {}: test allreduce sum api ok\n".format(pg.rank()) - ) - - # TODO - # test allreduce max/min/prod - - # test broadcast - # rank 0 - x = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - # rank 1 - y = np.random.random(self.shape).astype(self.dtype) - tensor_y = paddle.to_tensor(y) - - broadcast_result = paddle.assign(tensor_x) - if pg.rank() == 0: - # XPU don't support event query by now, so just use sync op here - task = dist.broadcast(tensor_x, 0) - paddle.device.xpu.synchronize() - assert np.array_equal(broadcast_result, tensor_x) - else: - task = dist.broadcast(tensor_y, 0) - paddle.device.xpu.synchronize() - assert np.array_equal(broadcast_result, tensor_y) - - sys.stdout.write( - "rank {}: test broadcast api ok\n".format(pg.rank()) - ) - - # test barrier - # rank 0 - if pg.rank() == 0: - pg.barrier(device_id) - # rank 1 - else: - task = pg.barrier(device_id) - task.wait() - - sys.stdout.write("rank {}: test barrier api ok\n".format(pg.rank())) - - # test allgather - # rank 0 - x = np.random.random(self.shape).astype(self.dtype) - y = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - tensor_y = paddle.to_tensor(y) - out_shape = list(self.shape) - out_shape[0] *= 2 - out = np.random.random(out_shape).astype(self.dtype) - tensor_out = paddle.to_tensor(out) - if pg.rank() == 0: - task = pg.all_gather(tensor_x, tensor_out) - task.wait() - paddle.device.xpu.synchronize() - # rank 1 - else: - tensor_out_list = [ - paddle.empty_like(tensor_x), - paddle.empty_like(tensor_x), - ] - task = dist.all_gather(tensor_out_list, tensor_y) - paddle.device.xpu.synchronize() - tensor_out = paddle.concat(tensor_out_list) - out_1 = paddle.slice(tensor_out, [0], [0], [out_shape[0] // 2]) - out_2 = paddle.slice( - tensor_out, [0], [out_shape[0] // 2], [out_shape[0]] - ) - assert np.array_equal(tensor_x, out_1) - assert np.array_equal(tensor_y, out_2) - sys.stdout.write( - "rank {}: test allgather api ok\n".format(pg.rank()) - ) - - if pg.rank() == 0: - task = pg.all_gather(tensor_x, tensor_out) - task.wait() - paddle.device.xpu.synchronize() - # rank 1 - else: - tensor_out_list = [] - task = dist.all_gather(tensor_out_list, tensor_y) - paddle.device.xpu.synchronize() - tensor_out = paddle.concat(tensor_out_list) - out_1 = paddle.slice(tensor_out, [0], [0], [out_shape[0] // 2]) - out_2 = paddle.slice( - tensor_out, [0], [out_shape[0] // 2], [out_shape[0]] - ) - assert np.array_equal(tensor_x, out_1) - assert np.array_equal(tensor_y, out_2) - sys.stdout.write( - "rank {}: test allgather api2 ok\n".format(pg.rank()) - ) - - # test Reduce - # rank 0 - x = np.random.random(self.shape).astype(self.dtype) - y = np.random.random(self.shape).astype(self.dtype) - tensor_x = paddle.to_tensor(x) - tensor_y = paddle.to_tensor(y) - sum_result = tensor_x + tensor_y - if pg.rank() == 0: - task = dist.reduce(tensor_x, 0, sync_op=True) - paddle.device.xpu.synchronize() - # rank 1 - else: - task = dist.reduce(tensor_y, 0, sync_op=False) - task.wait() - paddle.device.xpu.synchronize() - if pg.rank() == 0: - assert np.array_equal(tensor_x, sum_result) - sys.stdout.write( - "rank {}: test reduce sum api ok\n".format(pg.rank()) - ) + device_id = paddle.distributed.ParallelEnv().dev_id + paddle.set_device('xpu:%d' % device_id) + + pg = init_process_group() + sys.stdout.write( + "rank {}: size {} name {}\n".format(pg.rank(), pg.size(), pg.name()) + ) + sys.stdout.write("rank {}: test new group api ok\n".format(pg.rank())) + + # test allreduce sum + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + # rank 1 + y = np.random.random(self.shape).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + sum_result = tensor_x + tensor_y + if pg.rank() == 0: + task = dist.all_reduce(tensor_x) + assert np.array_equal(tensor_x, sum_result) + else: + task = dist.all_reduce(tensor_y) + assert np.array_equal(tensor_y, sum_result) + + sys.stdout.write( + "rank {}: test allreduce sum api ok\n".format(pg.rank()) + ) + + # TODO + # test allreduce max/min/prod + + # test broadcast + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + # rank 1 + y = np.random.random(self.shape).astype(self.dtype) + tensor_y = paddle.to_tensor(y) + + broadcast_result = paddle.assign(tensor_x) + if pg.rank() == 0: + # XPU don't support event query by now, so just use sync op here + task = dist.broadcast(tensor_x, 0) + paddle.device.xpu.synchronize() + assert np.array_equal(broadcast_result, tensor_x) + else: + task = dist.broadcast(tensor_y, 0) + paddle.device.xpu.synchronize() + assert np.array_equal(broadcast_result, tensor_y) + + sys.stdout.write("rank {}: test broadcast api ok\n".format(pg.rank())) + + # test barrier + # rank 0 + if pg.rank() == 0: + pg.barrier(device_id) + # rank 1 + else: + task = pg.barrier(device_id) + task.wait() + + sys.stdout.write("rank {}: test barrier api ok\n".format(pg.rank())) + + # test allgather + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + y = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + tensor_y = paddle.to_tensor(y) + out_shape = list(self.shape) + out_shape[0] *= 2 + out = np.random.random(out_shape).astype(self.dtype) + tensor_out = paddle.to_tensor(out) + if pg.rank() == 0: + task = pg.all_gather(tensor_x, tensor_out) + task.wait() + paddle.device.xpu.synchronize() + # rank 1 + else: + tensor_out_list = [ + paddle.empty_like(tensor_x), + paddle.empty_like(tensor_x), + ] + task = dist.all_gather(tensor_out_list, tensor_y) + paddle.device.xpu.synchronize() + tensor_out = paddle.concat(tensor_out_list) + out_1 = paddle.slice(tensor_out, [0], [0], [out_shape[0] // 2]) + out_2 = paddle.slice( + tensor_out, [0], [out_shape[0] // 2], [out_shape[0]] + ) + assert np.array_equal(tensor_x, out_1) + assert np.array_equal(tensor_y, out_2) + sys.stdout.write("rank {}: test allgather api ok\n".format(pg.rank())) + + if pg.rank() == 0: + task = pg.all_gather(tensor_x, tensor_out) + task.wait() + paddle.device.xpu.synchronize() + # rank 1 + else: + tensor_out_list = [] + task = dist.all_gather(tensor_out_list, tensor_y) + paddle.device.xpu.synchronize() + tensor_out = paddle.concat(tensor_out_list) + out_1 = paddle.slice(tensor_out, [0], [0], [out_shape[0] // 2]) + out_2 = paddle.slice( + tensor_out, [0], [out_shape[0] // 2], [out_shape[0]] + ) + assert np.array_equal(tensor_x, out_1) + assert np.array_equal(tensor_y, out_2) + sys.stdout.write("rank {}: test allgather api2 ok\n".format(pg.rank())) + + # test Reduce + # rank 0 + x = np.random.random(self.shape).astype(self.dtype) + y = np.random.random(self.shape).astype(self.dtype) + tensor_x = paddle.to_tensor(x) + tensor_y = paddle.to_tensor(y) + sum_result = tensor_x + tensor_y + if pg.rank() == 0: + task = dist.reduce(tensor_x, 0, sync_op=True) + paddle.device.xpu.synchronize() + # rank 1 + else: + task = dist.reduce(tensor_y, 0, sync_op=False) + task.wait() + paddle.device.xpu.synchronize() + if pg.rank() == 0: + assert np.array_equal(tensor_x, sum_result) + sys.stdout.write("rank {}: test reduce sum api ok\n".format(pg.rank())) class TestProcessGroupFp16(TestProcessGroupFp32): -- GitLab