未验证 提交 e83f5f33 编写于 作者: 姜永久 提交者: GitHub

remove xpu eager guard tests (#48786)

上级 25dafc58
......@@ -19,7 +19,6 @@ import numpy as np
import paddle
import paddle.distributed as dist
import paddle.fluid as fluid
from paddle.fluid.framework import _test_eager_guard
from paddle.nn import Linear
paddle.seed(1024)
......@@ -69,58 +68,57 @@ class SimpleNet(fluid.Layer):
class TestDistTraning(unittest.TestCase):
def test_multiple_xpus(self):
self.trainer_id = dist.get_rank()
with _test_eager_guard():
self.pg = dist.init_parallel_env()
self.pg = dist.init_parallel_env()
model_a = SimpleNet(self.trainer_id)
model_b = SimpleNet(self.trainer_id)
model_a = SimpleNet(self.trainer_id)
model_b = SimpleNet(self.trainer_id)
state_dict = model_a.state_dict()
model_b.set_state_dict(state_dict)
state_dict = model_a.state_dict()
model_b.set_state_dict(state_dict)
model_a = paddle.DataParallel(
model_a, find_unused_parameters=True, group=self.pg
model_a = paddle.DataParallel(
model_a, find_unused_parameters=True, group=self.pg
)
model_b = paddle.DataParallel(
model_b, find_unused_parameters=True, group=self.pg
)
ones_input = paddle.ones(shape=(batch, in_dim))
ones_input.stop_gradient = True
w1_grad_sum = np.zeros((in_dim, out_dim), dtype='float32')
w2_grad_sum = np.zeros((in_dim, out_dim), dtype='float32')
for step_id in range(5):
random_input = paddle.rand(shape=(batch, in_dim))
random_input.stop_gradient = True
if step_id % 2 == 0:
out_a = model_a(random_input)
out_b = model_b(random_input)
else:
out_a = model_a(ones_input)
out_b = model_b(ones_input)
out_a.sum().backward()
out_b.sum().backward()
self.check_gradient(model_a.parameters())
self.check_gradient(model_b.parameters())
# test acc gradient
w1_grad_sum = self.check_acc(
model_a._layers.w1.grad,
w1_grad_sum,
model_b._layers.w1.grad,
)
model_b = paddle.DataParallel(
model_b, find_unused_parameters=True, group=self.pg
w2_grad_sum = self.check_acc(
model_a._layers.w2.grad,
w2_grad_sum,
model_b._layers.w2.grad,
)
ones_input = paddle.ones(shape=(batch, in_dim))
ones_input.stop_gradient = True
w1_grad_sum = np.zeros((in_dim, out_dim), dtype='float32')
w2_grad_sum = np.zeros((in_dim, out_dim), dtype='float32')
for step_id in range(5):
random_input = paddle.rand(shape=(batch, in_dim))
random_input.stop_gradient = True
if step_id % 2 == 0:
out_a = model_a(random_input)
out_b = model_b(random_input)
else:
out_a = model_a(ones_input)
out_b = model_b(ones_input)
out_a.sum().backward()
out_b.sum().backward()
self.check_gradient(model_a.parameters())
self.check_gradient(model_b.parameters())
# test acc gradient
w1_grad_sum = self.check_acc(
model_a._layers.w1.grad,
w1_grad_sum,
model_b._layers.w1.grad,
)
w2_grad_sum = self.check_acc(
model_a._layers.w2.grad,
w2_grad_sum,
model_b._layers.w2.grad,
)
model_a.clear_gradients()
model_a.clear_gradients()
def check_acc(self, grad, grad_sum, acc_grad):
if grad is not None:
......
......@@ -21,7 +21,6 @@ import numpy as np
import paddle
import paddle.distributed as dist
from paddle.fluid.dygraph.parallel import ParallelEnv
from paddle.fluid.framework import _test_eager_guard
def init_process_group(strategy=None):
......@@ -45,150 +44,137 @@ class TestProcessGroupFp32(unittest.TestCase):
self.shape = (2, 10, 5)
def test_create_process_group_bkcl(self):
with _test_eager_guard():
device_id = paddle.distributed.ParallelEnv().dev_id
paddle.set_device('xpu:%d' % device_id)
pg = init_process_group()
sys.stdout.write(
"rank {}: size {} name {}\n".format(
pg.rank(), pg.size(), pg.name()
)
)
sys.stdout.write(
"rank {}: test new group api ok\n".format(pg.rank())
)
# test allreduce sum
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
# rank 1
y = np.random.random(self.shape).astype(self.dtype)
tensor_y = paddle.to_tensor(y)
sum_result = tensor_x + tensor_y
if pg.rank() == 0:
task = dist.all_reduce(tensor_x)
assert np.array_equal(tensor_x, sum_result)
else:
task = dist.all_reduce(tensor_y)
assert np.array_equal(tensor_y, sum_result)
sys.stdout.write(
"rank {}: test allreduce sum api ok\n".format(pg.rank())
)
# TODO
# test allreduce max/min/prod
# test broadcast
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
# rank 1
y = np.random.random(self.shape).astype(self.dtype)
tensor_y = paddle.to_tensor(y)
broadcast_result = paddle.assign(tensor_x)
if pg.rank() == 0:
# XPU don't support event query by now, so just use sync op here
task = dist.broadcast(tensor_x, 0)
paddle.device.xpu.synchronize()
assert np.array_equal(broadcast_result, tensor_x)
else:
task = dist.broadcast(tensor_y, 0)
paddle.device.xpu.synchronize()
assert np.array_equal(broadcast_result, tensor_y)
sys.stdout.write(
"rank {}: test broadcast api ok\n".format(pg.rank())
)
# test barrier
# rank 0
if pg.rank() == 0:
pg.barrier(device_id)
# rank 1
else:
task = pg.barrier(device_id)
task.wait()
sys.stdout.write("rank {}: test barrier api ok\n".format(pg.rank()))
# test allgather
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
y = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
tensor_y = paddle.to_tensor(y)
out_shape = list(self.shape)
out_shape[0] *= 2
out = np.random.random(out_shape).astype(self.dtype)
tensor_out = paddle.to_tensor(out)
if pg.rank() == 0:
task = pg.all_gather(tensor_x, tensor_out)
task.wait()
paddle.device.xpu.synchronize()
# rank 1
else:
tensor_out_list = [
paddle.empty_like(tensor_x),
paddle.empty_like(tensor_x),
]
task = dist.all_gather(tensor_out_list, tensor_y)
paddle.device.xpu.synchronize()
tensor_out = paddle.concat(tensor_out_list)
out_1 = paddle.slice(tensor_out, [0], [0], [out_shape[0] // 2])
out_2 = paddle.slice(
tensor_out, [0], [out_shape[0] // 2], [out_shape[0]]
)
assert np.array_equal(tensor_x, out_1)
assert np.array_equal(tensor_y, out_2)
sys.stdout.write(
"rank {}: test allgather api ok\n".format(pg.rank())
)
if pg.rank() == 0:
task = pg.all_gather(tensor_x, tensor_out)
task.wait()
paddle.device.xpu.synchronize()
# rank 1
else:
tensor_out_list = []
task = dist.all_gather(tensor_out_list, tensor_y)
paddle.device.xpu.synchronize()
tensor_out = paddle.concat(tensor_out_list)
out_1 = paddle.slice(tensor_out, [0], [0], [out_shape[0] // 2])
out_2 = paddle.slice(
tensor_out, [0], [out_shape[0] // 2], [out_shape[0]]
)
assert np.array_equal(tensor_x, out_1)
assert np.array_equal(tensor_y, out_2)
sys.stdout.write(
"rank {}: test allgather api2 ok\n".format(pg.rank())
)
# test Reduce
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
y = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
tensor_y = paddle.to_tensor(y)
sum_result = tensor_x + tensor_y
if pg.rank() == 0:
task = dist.reduce(tensor_x, 0, sync_op=True)
paddle.device.xpu.synchronize()
# rank 1
else:
task = dist.reduce(tensor_y, 0, sync_op=False)
task.wait()
paddle.device.xpu.synchronize()
if pg.rank() == 0:
assert np.array_equal(tensor_x, sum_result)
sys.stdout.write(
"rank {}: test reduce sum api ok\n".format(pg.rank())
)
device_id = paddle.distributed.ParallelEnv().dev_id
paddle.set_device('xpu:%d' % device_id)
pg = init_process_group()
sys.stdout.write(
"rank {}: size {} name {}\n".format(pg.rank(), pg.size(), pg.name())
)
sys.stdout.write("rank {}: test new group api ok\n".format(pg.rank()))
# test allreduce sum
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
# rank 1
y = np.random.random(self.shape).astype(self.dtype)
tensor_y = paddle.to_tensor(y)
sum_result = tensor_x + tensor_y
if pg.rank() == 0:
task = dist.all_reduce(tensor_x)
assert np.array_equal(tensor_x, sum_result)
else:
task = dist.all_reduce(tensor_y)
assert np.array_equal(tensor_y, sum_result)
sys.stdout.write(
"rank {}: test allreduce sum api ok\n".format(pg.rank())
)
# TODO
# test allreduce max/min/prod
# test broadcast
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
# rank 1
y = np.random.random(self.shape).astype(self.dtype)
tensor_y = paddle.to_tensor(y)
broadcast_result = paddle.assign(tensor_x)
if pg.rank() == 0:
# XPU don't support event query by now, so just use sync op here
task = dist.broadcast(tensor_x, 0)
paddle.device.xpu.synchronize()
assert np.array_equal(broadcast_result, tensor_x)
else:
task = dist.broadcast(tensor_y, 0)
paddle.device.xpu.synchronize()
assert np.array_equal(broadcast_result, tensor_y)
sys.stdout.write("rank {}: test broadcast api ok\n".format(pg.rank()))
# test barrier
# rank 0
if pg.rank() == 0:
pg.barrier(device_id)
# rank 1
else:
task = pg.barrier(device_id)
task.wait()
sys.stdout.write("rank {}: test barrier api ok\n".format(pg.rank()))
# test allgather
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
y = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
tensor_y = paddle.to_tensor(y)
out_shape = list(self.shape)
out_shape[0] *= 2
out = np.random.random(out_shape).astype(self.dtype)
tensor_out = paddle.to_tensor(out)
if pg.rank() == 0:
task = pg.all_gather(tensor_x, tensor_out)
task.wait()
paddle.device.xpu.synchronize()
# rank 1
else:
tensor_out_list = [
paddle.empty_like(tensor_x),
paddle.empty_like(tensor_x),
]
task = dist.all_gather(tensor_out_list, tensor_y)
paddle.device.xpu.synchronize()
tensor_out = paddle.concat(tensor_out_list)
out_1 = paddle.slice(tensor_out, [0], [0], [out_shape[0] // 2])
out_2 = paddle.slice(
tensor_out, [0], [out_shape[0] // 2], [out_shape[0]]
)
assert np.array_equal(tensor_x, out_1)
assert np.array_equal(tensor_y, out_2)
sys.stdout.write("rank {}: test allgather api ok\n".format(pg.rank()))
if pg.rank() == 0:
task = pg.all_gather(tensor_x, tensor_out)
task.wait()
paddle.device.xpu.synchronize()
# rank 1
else:
tensor_out_list = []
task = dist.all_gather(tensor_out_list, tensor_y)
paddle.device.xpu.synchronize()
tensor_out = paddle.concat(tensor_out_list)
out_1 = paddle.slice(tensor_out, [0], [0], [out_shape[0] // 2])
out_2 = paddle.slice(
tensor_out, [0], [out_shape[0] // 2], [out_shape[0]]
)
assert np.array_equal(tensor_x, out_1)
assert np.array_equal(tensor_y, out_2)
sys.stdout.write("rank {}: test allgather api2 ok\n".format(pg.rank()))
# test Reduce
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
y = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
tensor_y = paddle.to_tensor(y)
sum_result = tensor_x + tensor_y
if pg.rank() == 0:
task = dist.reduce(tensor_x, 0, sync_op=True)
paddle.device.xpu.synchronize()
# rank 1
else:
task = dist.reduce(tensor_y, 0, sync_op=False)
task.wait()
paddle.device.xpu.synchronize()
if pg.rank() == 0:
assert np.array_equal(tensor_x, sum_result)
sys.stdout.write("rank {}: test reduce sum api ok\n".format(pg.rank()))
class TestProcessGroupFp16(TestProcessGroupFp32):
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册