未验证 提交 a6a4895a 编写于 作者: Y Yulong Ao 提交者: GitHub

[0D Tensor] Add tests of 0D Tensor for allgather and allreduce (#49175)

上级 495c1fc0
......@@ -68,6 +68,24 @@ class TestProcessGroupFp32(unittest.TestCase):
print("test allreduce sum api ok")
# test allreduce sum with shape = []
# rank 0
x = np.random.random([]).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
# rank 1
y = np.random.random([]).astype(self.dtype)
tensor_y = paddle.to_tensor(y)
sum_result = tensor_x + tensor_y
if pg.rank() == 0:
task = dist.all_reduce(tensor_x)
assert np.array_equal(tensor_x, sum_result)
else:
task = dist.all_reduce(tensor_y)
assert np.array_equal(tensor_y, sum_result)
print("test allreduce sum api with = [] ok")
# test allreduce max
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
......@@ -89,6 +107,27 @@ class TestProcessGroupFp32(unittest.TestCase):
print("test allreduce max api ok")
# test allreduce max with shape = []
# rank 0
x = np.random.random([]).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
# rank 1
y = np.random.random([]).astype(self.dtype)
tensor_y = paddle.to_tensor(y)
max_result = paddle.maximum(tensor_x, tensor_y)
if pg.rank() == 0:
task = dist.all_reduce(tensor_x, dist.ReduceOp.MAX, sync_op=False)
task.wait()
assert np.array_equal(tensor_x, max_result)
else:
task = dist.all_reduce(tensor_y, dist.ReduceOp.MAX, sync_op=False)
task.wait()
assert np.array_equal(tensor_y, max_result)
print("test allreduce max api with shape = [] ok")
# test allreduce min
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
......@@ -110,6 +149,27 @@ class TestProcessGroupFp32(unittest.TestCase):
print("test allreduce min api ok")
# test allreduce min with shape = []
# rank 0
x = np.random.random([]).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
# rank 1
y = np.random.random([]).astype(self.dtype)
tensor_y = paddle.to_tensor(y)
min_result = paddle.minimum(tensor_x, tensor_y)
if pg.rank() == 0:
task = dist.all_reduce(tensor_x, dist.ReduceOp.MIN, sync_op=False)
task.wait()
assert np.array_equal(tensor_x, min_result)
else:
task = dist.all_reduce(tensor_y, dist.ReduceOp.MIN, sync_op=False)
task.wait()
assert np.array_equal(tensor_y, min_result)
print("test allreduce min api with shape [] ok")
# test allreduce prod
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
......@@ -131,6 +191,27 @@ class TestProcessGroupFp32(unittest.TestCase):
print("test allreduce prod api ok")
# test allreduce prod with shape = []
# rank 0
x = np.random.random([]).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
# rank 1
y = np.random.random([]).astype(self.dtype)
tensor_y = paddle.to_tensor(y)
prod_result = np.multiply(x, y)
if pg.rank() == 0:
task = dist.all_reduce(tensor_x, dist.ReduceOp.PROD, sync_op=False)
task.wait()
assert np.array_equal(tensor_x, prod_result)
else:
task = dist.all_reduce(tensor_y, dist.ReduceOp.PROD, sync_op=False)
task.wait()
assert np.array_equal(tensor_y, prod_result)
print("test allreduce prod api with shape = [] ok")
# test broadcast
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
......@@ -236,6 +317,27 @@ class TestProcessGroupFp32(unittest.TestCase):
assert np.array_equal(tensor_y, out_2)
print("test allgather api2 ok\n")
# test allgather with shape = []
# rank 0
x = np.random.random([]).astype(self.dtype)
y = np.random.random([]).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
tensor_y = paddle.to_tensor(y)
tensor_out_list = []
if pg.rank() == 0:
task = dist.all_gather(tensor_out_list, tensor_x)
task.wait()
paddle.device.cuda.synchronize()
# rank 1
else:
task = dist.all_gather(tensor_out_list, tensor_y, sync_op=False)
paddle.device.cuda.synchronize()
out_1 = tensor_out_list[0]
out_2 = tensor_out_list[1]
assert np.array_equal(tensor_x, out_1)
assert np.array_equal(tensor_y, out_2)
print("test allgather api with shape [] ok\n")
# test alltoall
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册