未验证 提交 866f492f 编写于 作者: 姜永久 提交者: GitHub

Yj/rm custom eager guard tests (#48778)

* remove custom op eager guard tests

* rm custom_runtime eager guard tests

* rm xccl eager tests

* modify

* modify

* set grad retain for custom_relu test

* retain grad for custom relu op

* modify custom_relu test
上级 def2a87f
......@@ -19,7 +19,6 @@ import numpy as np
from utils import extra_cc_args, extra_nvcc_args, paddle_includes
import paddle
from paddle.fluid.framework import _test_eager_guard
from paddle.utils.cpp_extension import get_build_directory, load
from paddle.utils.cpp_extension.extension_utils import run_cmd
......@@ -49,17 +48,12 @@ class TestContextPool(unittest.TestCase):
if paddle.is_compiled_with_cuda():
self.devices.append('gpu')
def use_context_pool(self):
def test_use_context_pool(self):
x = paddle.ones([2, 2], dtype='float32')
out = custom_ops.context_pool_test(x)
np.testing.assert_array_equal(x.numpy(), out.numpy())
def test_using_context_pool(self):
with _test_eager_guard():
self.use_context_pool()
self.use_context_pool()
if __name__ == '__main__':
unittest.main()
......@@ -19,7 +19,6 @@ import numpy as np
from utils import extra_cc_args, extra_nvcc_args, paddle_includes
import paddle
from paddle.fluid.framework import _test_eager_guard
from paddle.utils.cpp_extension import get_build_directory, load
from paddle.utils.cpp_extension.extension_utils import run_cmd
......@@ -57,7 +56,7 @@ class TestJitCustomAttrs(unittest.TestCase):
self.int64_vec_attr = [10000000000, 10000000000, 10000000000]
self.str_vec_attr = ["StrAttr", "StrAttr", "StrAttr"]
def func_attr_value(self):
def test_func_attr_value(self):
x = paddle.ones([2, 2], dtype='float32')
x.stop_gradient = False
out = custom_attrs.attr_test(
......@@ -77,12 +76,7 @@ class TestJitCustomAttrs(unittest.TestCase):
np.testing.assert_array_equal(x.numpy(), out.numpy())
def test_attr_value(self):
with _test_eager_guard():
self.func_attr_value()
self.func_attr_value()
def func_const_attr_value(self):
def test_const_attr_value(self):
x = paddle.ones([2, 2], dtype='float32')
x.stop_gradient = False
out = custom_attrs.const_attr_test(
......@@ -102,11 +96,6 @@ class TestJitCustomAttrs(unittest.TestCase):
np.testing.assert_array_equal(x.numpy(), out.numpy())
def test_const_attr_value(self):
with _test_eager_guard():
self.func_const_attr_value()
self.func_const_attr_value()
if __name__ == '__main__':
unittest.main()
......@@ -20,7 +20,6 @@ from utils import extra_cc_args, extra_nvcc_args, paddle_includes
import paddle
import paddle.static as static
from paddle.fluid.framework import _test_eager_guard
from paddle.utils.cpp_extension import get_build_directory, load
from paddle.utils.cpp_extension.extension_utils import run_cmd
......@@ -122,7 +121,7 @@ class TestCustomConcatDynamicAxisJit(unittest.TestCase):
),
)
def func_dynamic(self):
def test_dynamic(self):
for dtype in self.dtypes:
for axis in self.axises:
out, grad_inputs = concat_dynamic(
......@@ -136,11 +135,6 @@ class TestCustomConcatDynamicAxisJit(unittest.TestCase):
for x_grad, pd_x_grad in zip(grad_inputs, pd_grad_inputs):
self.check_output(x_grad, pd_x_grad, "x_grad")
def test_dynamic(self):
with _test_eager_guard():
self.func_dynamic()
self.func_dynamic()
def test_static(self):
for dtype in self.dtypes:
for axis in self.axises:
......@@ -155,7 +149,7 @@ class TestCustomConcatDynamicAxisJit(unittest.TestCase):
self.check_output(x1_grad, pd_x1_grad, "x1_grad")
self.check_output(x2_grad, pd_x2_grad, "x2_grad")
def func_dynamic_with_attr(self):
def test_dynamic_with_attr(self):
for dtype in self.dtypes:
for axis in self.axises:
out, grad_inputs = concat_dynamic(
......@@ -173,11 +167,6 @@ class TestCustomConcatDynamicAxisJit(unittest.TestCase):
for x_grad, pd_x_grad in zip(grad_inputs, pd_grad_inputs):
self.check_output(x_grad, pd_x_grad, "x_grad")
def test_dynamic_with_attr(self):
with _test_eager_guard():
self.func_dynamic_with_attr()
self.func_dynamic_with_attr()
def test_static_with_attr(self):
for dtype in self.dtypes:
for axis in self.axises:
......
......@@ -20,7 +20,6 @@ from utils import extra_cc_args, extra_nvcc_args, paddle_includes
import paddle
import paddle.static as static
from paddle.fluid.framework import _test_eager_guard
from paddle.utils.cpp_extension import get_build_directory, load
from paddle.utils.cpp_extension.extension_utils import run_cmd
......@@ -128,16 +127,11 @@ class TestCustomConjJit(unittest.TestCase):
self.check_output(out, pd_out, "out")
self.check_output(x_grad, pd_x_grad, "x's grad")
def func_dynamic(self):
def test_dynamic(self):
for dtype in self.dtypes:
np_input = np.random.random(self.shape).astype(dtype)
self.run_dynamic(dtype, np_input)
def test_dynamic(self):
with _test_eager_guard():
self.func_dynamic()
self.func_dynamic()
def test_static(self):
for dtype in self.dtypes:
np_input = np.random.random(self.shape).astype(dtype)
......
......@@ -21,7 +21,6 @@ from utils import extra_cc_args, extra_nvcc_args, paddle_includes
import paddle
import paddle.nn.functional as F
import paddle.static as static
from paddle.fluid.framework import _test_eager_guard
from paddle.utils.cpp_extension import get_build_directory, load
from paddle.utils.cpp_extension.extension_utils import run_cmd
......@@ -140,7 +139,7 @@ class TestCustomLinearJit(unittest.TestCase):
)
self.check_output(phi_bias_grad, pd_bias_grad, "bias_grad")
def func_dynamic(self):
def test_dynamic(self):
for device in self.devices:
for dtype in self.dtypes:
(
......@@ -176,11 +175,6 @@ class TestCustomLinearJit(unittest.TestCase):
)
self.check_output(phi_bias_grad, pd_bias_grad, "bias_grad")
def test_dynamic(self):
with _test_eager_guard():
self.func_dynamic()
self.func_dynamic()
if __name__ == "__main__":
unittest.main()
......@@ -21,7 +21,7 @@ from utils import IS_MAC, extra_cc_args, extra_nvcc_args, paddle_includes
import paddle
from paddle import nn
from paddle.fluid.framework import _in_legacy_dygraph, _test_eager_guard
from paddle.fluid.framework import _in_legacy_dygraph
from paddle.utils.cpp_extension import get_build_directory, load
from paddle.utils.cpp_extension.extension_utils import run_cmd
......@@ -115,7 +115,7 @@ class TestDygraphModel(unittest.TestCase):
shape=[None, self.in_dim], dtype='float32', name='x'
)
def func_train_eval(self):
def test_train_eval(self):
for device in self.devices:
# set device
paddle.set_device(device)
......@@ -151,11 +151,6 @@ class TestDygraphModel(unittest.TestCase):
origin_relu_eval_out, custom_relu_eval_out
)
def test_train_eval(self):
with _test_eager_guard():
self.func_train_eval()
self.func_train_eval()
def train_model(self, use_custom_op=False, dy2stat=False):
# reset random seed
paddle.seed(self.seed)
......
......@@ -20,7 +20,6 @@ from test_custom_relu_op_setup import custom_relu_dynamic, custom_relu_static
from utils import IS_MAC, extra_cc_args, extra_nvcc_args, paddle_includes
import paddle
from paddle.fluid.framework import _test_eager_guard
from paddle.utils.cpp_extension import get_build_directory, load
from paddle.utils.cpp_extension.extension_utils import run_cmd
......@@ -85,7 +84,7 @@ class TestJITLoad(unittest.TestCase):
),
)
def func_dynamic(self):
def test_dynamic(self):
for device in self.devices:
for dtype in self.dtypes:
if device == 'cpu' and dtype == 'float16':
......@@ -113,12 +112,7 @@ class TestJITLoad(unittest.TestCase):
),
)
def test_dynamic(self):
with _test_eager_guard():
self.func_dynamic()
self.func_dynamic()
def func_exception(self):
def test_exception(self):
caught_exception = False
try:
x = np.random.uniform(-1, 1, [4, 8]).astype('int32')
......@@ -143,11 +137,6 @@ class TestJITLoad(unittest.TestCase):
self.assertTrue("custom_relu_op.cu" in str(e))
self.assertTrue(caught_exception)
def test_exception(self):
with _test_eager_guard():
self.func_exception()
self.func_exception()
def test_load_multiple_module(self):
custom_module = load(
name='custom_conj_jit',
......
......@@ -21,7 +21,7 @@ import numpy as np
import paddle
import paddle.static as static
from paddle.fluid.framework import _test_eager_guard
from paddle import fluid
from paddle.utils.cpp_extension.extension_utils import run_cmd
from paddle.vision.transforms import Compose, Normalize
......@@ -251,7 +251,8 @@ class TestNewCustomOpSetUpInstall(unittest.TestCase):
),
)
def func_dynamic(self):
def test_dynamic(self):
fluid.set_flags({"FLAGS_retain_grad_for_all_tensor": True})
for device in self.devices:
for dtype in self.dtypes:
if device == 'cpu' and dtype == 'float16':
......@@ -278,11 +279,7 @@ class TestNewCustomOpSetUpInstall(unittest.TestCase):
x_grad, pd_x_grad
),
)
def test_dynamic(self):
with _test_eager_guard():
self.func_dynamic()
self.func_dynamic()
fluid.set_flags({"FLAGS_retain_grad_for_all_tensor": False})
def test_static_save_and_load_inference_model(self):
paddle.enable_static()
......@@ -350,6 +347,7 @@ class TestNewCustomOpSetUpInstall(unittest.TestCase):
paddle.disable_static()
def test_func_double_grad_dynamic(self):
fluid.set_flags({"FLAGS_retain_grad_for_all_tensor": True})
for device in self.devices:
for dtype in self.dtypes:
if device == 'cpu' and dtype == 'float16':
......@@ -375,6 +373,7 @@ class TestNewCustomOpSetUpInstall(unittest.TestCase):
dx_grad, pd_dx_grad
),
)
fluid.set_flags({"FLAGS_retain_grad_for_all_tensor": False})
def test_with_dataloader(self):
for device in self.devices:
......@@ -395,6 +394,7 @@ class TestNewCustomOpSetUpInstall(unittest.TestCase):
)
for batch_id, (image, _) in enumerate(train_loader()):
image = paddle.to_tensor(image)
out = self.custom_ops[0](image)
pd_out = paddle.nn.functional.relu(image)
np.testing.assert_array_equal(
......
......@@ -19,7 +19,6 @@ import numpy as np
from utils import extra_cc_args, extra_nvcc_args, paddle_includes
import paddle
from paddle.fluid.framework import _test_eager_guard
from paddle.utils.cpp_extension import get_build_directory, load
from paddle.utils.cpp_extension.extension_utils import run_cmd
......@@ -43,7 +42,7 @@ custom_ops = load(
class TestCustomSimpleSliceJit(unittest.TestCase):
def func_slice_output(self):
def test_slice_output(self):
np_x = np.random.random((5, 2)).astype("float32")
x = paddle.to_tensor(np_x)
custom_op_out = custom_ops.custom_simple_slice(x, 2, 3)
......@@ -56,11 +55,6 @@ class TestCustomSimpleSliceJit(unittest.TestCase):
),
)
def test_slice_output(self):
with _test_eager_guard():
self.func_slice_output()
self.func_slice_output()
if __name__ == "__main__":
unittest.main()
......@@ -20,7 +20,6 @@ from utils import extra_cc_args, extra_nvcc_args, paddle_includes
import paddle
import paddle.fluid as fluid
from paddle.fluid.framework import _test_eager_guard
from paddle.utils.cpp_extension import get_build_directory, load
from paddle.utils.cpp_extension.extension_utils import run_cmd
......@@ -68,7 +67,8 @@ class TestCustomTanhDoubleGradJit(unittest.TestCase):
self.dtypes = ['float32', 'float64']
self.devices = ['cpu']
def func_double_grad_dynamic(self):
def test_double_grad_dynamic(self):
fluid.set_flags({"FLAGS_retain_grad_for_all_tensor": True})
for device in self.devices:
for dtype in self.dtypes:
x = np.random.uniform(-1, 1, [4, 8]).astype(dtype)
......@@ -102,12 +102,6 @@ class TestCustomTanhDoubleGradJit(unittest.TestCase):
dout, pd_dout
),
)
def test_func_double_grad_dynamic(self):
fluid.set_flags({"FLAGS_retain_grad_for_all_tensor": True})
with _test_eager_guard():
self.func_double_grad_dynamic()
self.func_double_grad_dynamic()
fluid.set_flags({"FLAGS_retain_grad_for_all_tensor": False})
......
......@@ -19,7 +19,6 @@ import numpy as np
from utils import extra_cc_args, paddle_includes
import paddle
from paddle.fluid.framework import _test_eager_guard
from paddle.utils.cpp_extension import get_build_directory, load
from paddle.utils.cpp_extension.extension_utils import run_cmd
......@@ -43,7 +42,7 @@ class TestJitDispatch(unittest.TestCase):
def setUp(self):
paddle.set_device('cpu')
def run_dispatch_test_impl(self, func, dtype):
def run_dispatch_test(self, func, dtype):
np_x = np.ones([2, 2]).astype(dtype)
x = paddle.to_tensor(np_x)
out = func(x)
......@@ -56,11 +55,6 @@ class TestJitDispatch(unittest.TestCase):
err_msg='custom op x: {},\n custom op out: {}'.format(np_x, np_out),
)
def run_dispatch_test(self, func, dtype):
with _test_eager_guard():
self.run_dispatch_test_impl(func, dtype)
self.run_dispatch_test_impl(func, dtype)
def test_dispatch_integer(self):
dtypes = ["int32", "int64", "int8", "uint8", "int16"]
for dtype in dtypes:
......
......@@ -19,7 +19,6 @@ import numpy as np
from utils import extra_cc_args, paddle_includes
import paddle
from paddle.fluid.framework import _test_eager_guard
from paddle.utils.cpp_extension import get_build_directory, load
from paddle.utils.cpp_extension.extension_utils import run_cmd
......@@ -89,7 +88,7 @@ class TestMultiOutputDtypes(unittest.TestCase):
self.check_multi_outputs(res)
paddle.disable_static()
def func_dynamic(self):
def test_dynamic(self):
for device in self.devices:
for dtype in self.dtypes:
paddle.set_device(device)
......@@ -100,11 +99,6 @@ class TestMultiOutputDtypes(unittest.TestCase):
self.assertTrue(len(outs) == 3)
self.check_multi_outputs(outs, True)
def test_dynamic(self):
with _test_eager_guard():
self.func_dynamic()
self.func_dynamic()
if __name__ == '__main__':
unittest.main()
......@@ -20,7 +20,6 @@ import numpy as np
import paddle
from paddle.fluid import core
from paddle.fluid.dygraph.parallel import ParallelEnv
from paddle.fluid.framework import _test_eager_guard
def init_process_group(strategy=None):
......@@ -50,192 +49,187 @@ class TestProcessGroupFp32(unittest.TestCase):
self.shape = (2, 10, 5)
def test_create_process_group_xccl(self):
with _test_eager_guard():
device_id = paddle.distributed.ParallelEnv().dev_id
paddle.set_device('custom_cpu:%d' % device_id)
pg = init_process_group()
x = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
y = np.random.random(self.shape).astype(self.dtype)
tensor_y = paddle.to_tensor(y)
sum_result = tensor_x + tensor_y
if pg.rank() == 0:
task = pg.all_reduce(tensor_x, core.ReduceOp.SUM, sync_op=True)
task.wait()
# assert np.array_equal(tensor_x, sum_result)
else:
task = pg.all_reduce(tensor_y, core.ReduceOp.SUM, sync_op=True)
task.wait()
# assert np.array_equal(tensor_y, sum_result)
print("test allreduce sum api ok")
x = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
y = np.random.random(self.shape).astype(self.dtype)
tensor_y = paddle.to_tensor(y)
max_result = paddle.maximum(tensor_x, tensor_y)
if pg.rank() == 0:
task = pg.all_reduce(tensor_x, core.ReduceOp.MAX, sync_op=True)
task.wait()
# assert np.array_equal(tensor_x, max_result)
else:
task = pg.all_reduce(tensor_y, core.ReduceOp.MAX, sync_op=True)
task.wait()
# assert np.array_equal(tensor_y, max_result)
print("test allreduce max api ok")
# test broadcast
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
# rank 1
y = np.random.random(self.shape).astype(self.dtype)
tensor_y = paddle.to_tensor(y)
broadcast_result = paddle.assign(tensor_x)
if pg.rank() == 0:
task = pg.broadcast(tensor_x, 0, sync_op=True)
task.wait()
# paddle.fluid.core._custom_device_synchronize("custom_cpu", -1)
assert task.is_completed()
# assert np.array_equal(broadcast_result, tensor_x)
else:
task = pg.broadcast(tensor_y, 0, sync_op=True)
task.wait()
# paddle.fluid.core._custom_device_synchronize("custom_cpu", -1)
assert task.is_completed()
# assert np.array_equal(broadcast_result, tensor_y)
print("test broadcast api ok")
# test barrier
# rank 0
if pg.rank() == 0:
task = pg.barrier(device_id)
task.wait()
# rank 1
else:
task = pg.barrier(device_id)
task.wait()
print("test barrier api ok\n")
return
# test allgather
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
y = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
tensor_y = paddle.to_tensor(y)
out_shape = list(self.shape)
out_shape[0] *= 2
out = np.random.random(out_shape).astype(self.dtype)
tensor_out = paddle.to_tensor(out)
if pg.rank() == 0:
task = pg.all_gather(tensor_out, tensor_x, sync_op=True)
task.wait()
# paddle.fluid.core._custom_device_synchronize("custom_cpu", -1)
# rank 1
else:
task = pg.all_gather(tensor_out, tensor_y, sync_op=True)
task.wait()
# paddle.fluid.core._custom_device_synchronize("custom_cpu", -1)
out_1 = paddle.slice(tensor_out, [0], [0], [out_shape[0] // 2])
out_2 = paddle.slice(
tensor_out, [0], [out_shape[0] // 2], [out_shape[0]]
)
# assert np.array_equal(tensor_x, out_1)
# assert np.array_equal(tensor_y, out_2)
print("test allgather api ok\n")
# test alltoall
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
y = np.random.random(self.shape).astype(self.dtype)
out1 = np.random.random(self.shape).astype(self.dtype)
out2 = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
tensor_y = paddle.to_tensor(y)
tensor_out1 = paddle.to_tensor(out1)
tensor_out2 = paddle.to_tensor(out2)
raw_tensor_x_2 = paddle.slice(
tensor_x, [0], [self.shape[0] // 2], [self.shape[0]]
)
raw_tensor_y_1 = paddle.slice(
tensor_y, [0], [0], [self.shape[0] // 2]
)
if pg.rank() == 0:
task = pg.alltoall(tensor_x, tensor_out1)
task.wait()
# paddle.fluid.core._custom_device_synchronize("custom_cpu", -1)
# rank 1
else:
task = pg.alltoall(tensor_y, tensor_out2)
task.wait()
# paddle.fluid.core._custom_device_synchronize("custom_cpu", -1)
out1_2 = paddle.slice(
tensor_out1, [0], [self.shape[0] // 2], [self.shape[0]]
)
out2_1 = paddle.slice(tensor_out2, [0], [0], [self.shape[0] // 2])
# if pg.rank() == 0:
# assert np.array_equal(out1_2.numpy(), raw_tensor_y_1.numpy())
# else:
# assert np.array_equal(out2_1, raw_tensor_x_2)
print("test alltoall api ok\n")
# test Reduce
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
y = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
tensor_y = paddle.to_tensor(y)
sum_result = tensor_x + tensor_y
if pg.rank() == 0:
task = pg.reduce(tensor_x, 0)
task.wait()
# paddle.fluid.core._custom_device_synchronize("custom_cpu", -1)
# rank 1
else:
task = pg.reduce(tensor_y, 0)
task.wait()
# paddle.fluid.core._custom_device_synchronize("custom_cpu", -1)
# if pg.rank() == 0:
# assert np.array_equal(tensor_x, sum_result)
print("test reduce sum api ok\n")
# test Scatter
# rank 0
in_shape = list(self.shape)
in_shape[0] *= 2
x = np.random.random(in_shape).astype(self.dtype)
y = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
tensor_y = paddle.to_tensor(y)
if pg.rank() == 0:
task = pg.scatter(tensor_x, tensor_y, 0)
task.wait()
# paddle.fluid.core._custom_device_synchronize("custom_cpu", -1)
# rank 1
else:
task = pg.scatter(tensor_x, tensor_y, 0)
task.wait()
# paddle.fluid.core._custom_device_synchronize("custom_cpu", -1)
out1 = paddle.slice(tensor_x, [0], [0], [self.shape[0]])
out2 = paddle.slice(
tensor_x, [0], [self.shape[0]], [self.shape[0] * 2]
)
# if pg.rank() == 0:
# assert np.array_equal(tensor_y, out1)
# else:
# assert np.array_equal(tensor_y, out2)
print("test scatter api ok\n")
device_id = paddle.distributed.ParallelEnv().dev_id
paddle.set_device('custom_cpu:%d' % device_id)
pg = init_process_group()
x = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
y = np.random.random(self.shape).astype(self.dtype)
tensor_y = paddle.to_tensor(y)
sum_result = tensor_x + tensor_y
if pg.rank() == 0:
task = pg.all_reduce(tensor_x, core.ReduceOp.SUM, sync_op=True)
task.wait()
# assert np.array_equal(tensor_x, sum_result)
else:
task = pg.all_reduce(tensor_y, core.ReduceOp.SUM, sync_op=True)
task.wait()
# assert np.array_equal(tensor_y, sum_result)
print("test allreduce sum api ok")
x = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
y = np.random.random(self.shape).astype(self.dtype)
tensor_y = paddle.to_tensor(y)
max_result = paddle.maximum(tensor_x, tensor_y)
if pg.rank() == 0:
task = pg.all_reduce(tensor_x, core.ReduceOp.MAX, sync_op=True)
task.wait()
# assert np.array_equal(tensor_x, max_result)
else:
task = pg.all_reduce(tensor_y, core.ReduceOp.MAX, sync_op=True)
task.wait()
# assert np.array_equal(tensor_y, max_result)
print("test allreduce max api ok")
# test broadcast
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
# rank 1
y = np.random.random(self.shape).astype(self.dtype)
tensor_y = paddle.to_tensor(y)
broadcast_result = paddle.assign(tensor_x)
if pg.rank() == 0:
task = pg.broadcast(tensor_x, 0, sync_op=True)
task.wait()
# paddle.fluid.core._custom_device_synchronize("custom_cpu", -1)
assert task.is_completed()
# assert np.array_equal(broadcast_result, tensor_x)
else:
task = pg.broadcast(tensor_y, 0, sync_op=True)
task.wait()
# paddle.fluid.core._custom_device_synchronize("custom_cpu", -1)
assert task.is_completed()
# assert np.array_equal(broadcast_result, tensor_y)
print("test broadcast api ok")
# test barrier
# rank 0
if pg.rank() == 0:
task = pg.barrier(device_id)
task.wait()
# rank 1
else:
task = pg.barrier(device_id)
task.wait()
print("test barrier api ok\n")
return
# test allgather
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
y = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
tensor_y = paddle.to_tensor(y)
out_shape = list(self.shape)
out_shape[0] *= 2
out = np.random.random(out_shape).astype(self.dtype)
tensor_out = paddle.to_tensor(out)
if pg.rank() == 0:
task = pg.all_gather(tensor_out, tensor_x, sync_op=True)
task.wait()
# paddle.fluid.core._custom_device_synchronize("custom_cpu", -1)
# rank 1
else:
task = pg.all_gather(tensor_out, tensor_y, sync_op=True)
task.wait()
# paddle.fluid.core._custom_device_synchronize("custom_cpu", -1)
out_1 = paddle.slice(tensor_out, [0], [0], [out_shape[0] // 2])
out_2 = paddle.slice(
tensor_out, [0], [out_shape[0] // 2], [out_shape[0]]
)
# assert np.array_equal(tensor_x, out_1)
# assert np.array_equal(tensor_y, out_2)
print("test allgather api ok\n")
# test alltoall
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
y = np.random.random(self.shape).astype(self.dtype)
out1 = np.random.random(self.shape).astype(self.dtype)
out2 = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
tensor_y = paddle.to_tensor(y)
tensor_out1 = paddle.to_tensor(out1)
tensor_out2 = paddle.to_tensor(out2)
raw_tensor_x_2 = paddle.slice(
tensor_x, [0], [self.shape[0] // 2], [self.shape[0]]
)
raw_tensor_y_1 = paddle.slice(tensor_y, [0], [0], [self.shape[0] // 2])
if pg.rank() == 0:
task = pg.alltoall(tensor_x, tensor_out1)
task.wait()
# paddle.fluid.core._custom_device_synchronize("custom_cpu", -1)
# rank 1
else:
task = pg.alltoall(tensor_y, tensor_out2)
task.wait()
# paddle.fluid.core._custom_device_synchronize("custom_cpu", -1)
out1_2 = paddle.slice(
tensor_out1, [0], [self.shape[0] // 2], [self.shape[0]]
)
out2_1 = paddle.slice(tensor_out2, [0], [0], [self.shape[0] // 2])
# if pg.rank() == 0:
# assert np.array_equal(out1_2.numpy(), raw_tensor_y_1.numpy())
# else:
# assert np.array_equal(out2_1, raw_tensor_x_2)
print("test alltoall api ok\n")
# test Reduce
# rank 0
x = np.random.random(self.shape).astype(self.dtype)
y = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
tensor_y = paddle.to_tensor(y)
sum_result = tensor_x + tensor_y
if pg.rank() == 0:
task = pg.reduce(tensor_x, 0)
task.wait()
# paddle.fluid.core._custom_device_synchronize("custom_cpu", -1)
# rank 1
else:
task = pg.reduce(tensor_y, 0)
task.wait()
# paddle.fluid.core._custom_device_synchronize("custom_cpu", -1)
# if pg.rank() == 0:
# assert np.array_equal(tensor_x, sum_result)
print("test reduce sum api ok\n")
# test Scatter
# rank 0
in_shape = list(self.shape)
in_shape[0] *= 2
x = np.random.random(in_shape).astype(self.dtype)
y = np.random.random(self.shape).astype(self.dtype)
tensor_x = paddle.to_tensor(x)
tensor_y = paddle.to_tensor(y)
if pg.rank() == 0:
task = pg.scatter(tensor_x, tensor_y, 0)
task.wait()
# paddle.fluid.core._custom_device_synchronize("custom_cpu", -1)
# rank 1
else:
task = pg.scatter(tensor_x, tensor_y, 0)
task.wait()
# paddle.fluid.core._custom_device_synchronize("custom_cpu", -1)
out1 = paddle.slice(tensor_x, [0], [0], [self.shape[0]])
out2 = paddle.slice(tensor_x, [0], [self.shape[0]], [self.shape[0] * 2])
# if pg.rank() == 0:
# assert np.array_equal(tensor_y, out1)
# else:
# assert np.array_equal(tensor_y, out2)
print("test scatter api ok\n")
if __name__ == "__main__":
......
......@@ -50,18 +50,12 @@ class TestCustomCPUPlugin(unittest.TestCase):
del os.environ['CUSTOM_DEVICE_ROOT']
def test_custom_device(self):
import paddle
with paddle.fluid.framework._test_eager_guard():
self._test_custom_device_dataloader()
self._test_custom_device_mnist()
self._test_eager_backward_api()
self._test_eager_copy_to()
self._test_fallback_kernel()
self._test_scalar()
self._test_custom_device_gradient_accumulation()
self._test_custom_device_dataloader()
self._test_custom_device_mnist()
self._test_eager_backward_api()
self._test_eager_copy_to()
self._test_fallback_kernel()
self._test_scalar()
def _test_custom_device_dataloader(self):
import paddle
......
......@@ -47,13 +47,7 @@ class TestCustomCPUProfilerPlugin(unittest.TestCase):
self.temp_dir.cleanup()
del os.environ['CUSTOM_DEVICE_ROOT']
def test_custom_device(self):
import paddle
with paddle.fluid.framework._test_eager_guard():
self._test_custom_profiler()
def _test_custom_profiler(self):
def test_custom_profiler(self):
import paddle
import paddle.profiler as profiler
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册