未验证 提交 1cffb1ff 编写于 作者: K kangguangli 提交者: GitHub

[with_data_parallel][part7] remove with_data_parallel in custom op test (#51164)

* remove with_data_parallel in custom op test

* finish TestCustomOpReluModelStaticMultiDevice

* fix typo

* add checks for relu output

* fix ci

* fix ci

* fix compile checks

* fix coverage ci
上级 af2c31a6
......@@ -19,10 +19,21 @@ if(WITH_GPU OR APPLE)
endif()
endif()
if(WITH_XPU)
if(WITH_GPU AND WITH_DISTRIBUTE)
py_test(test_custom_op_relu_model_static_multidevice
SRCS test_custom_op_relu_model_static_multidevice.py)
set_tests_properties(test_custom_op_relu_model_static_multidevice
PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 180)
endif()
if(WITH_XPU AND WITH_DISTRIBUTE)
set(CUSTOM_XPU_ENVS FLAGS_init_allocated_mem=0)
py_test(test_custom_relu_op_xpu_setup SRCS test_custom_relu_op_xpu_setup.py
ENVS ${CUSTOM_XPU_ENVS})
py_test(test_custom_op_relu_model_static_multidevice
SRCS test_custom_op_relu_model_static_multidevice.py ENVS
${CUSTOM_XPU_ENVS})
set_tests_properties(test_custom_op_relu_model_static_multidevice
PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" TIMEOUT 180)
endif()
py_test(test_custom_raw_op_kernel_op SRCS test_custom_raw_op_kernel_op.py)
......
# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import argparse
import os
import random
import numpy as np
from custom_setup_op_relu_model_static_multidevices import custom_relu
import paddle
import paddle.vision.transforms as T
from paddle import nn
from paddle.distributed import fleet
batch_size = 32
def get_program(args):
main_program = paddle.static.Program()
startup_program = paddle.static.Program()
with paddle.static.program_guard(main_program, startup_program):
x = paddle.static.data(
shape=[batch_size, 1, 28, 28], name='x', dtype='float32'
)
x = paddle.flatten(x, start_axis=1)
y = paddle.static.data(shape=[batch_size, 1], name='y', dtype='int64')
y = paddle.cast(y, dtype='float32')
in_dim = 784
out_dim = 10
fc1 = nn.Linear(in_dim, in_dim)
fc2 = nn.Linear(in_dim, out_dim)
relu_act = custom_relu if args.use_custom_op else nn.functional.relu
out = fc1(x)
relu_out1 = relu_act(out)
out = fc2(relu_out1)
relu_out2 = relu_act(out)
out = paddle.mean(relu_out2, axis=-1)
loss = nn.functional.mse_loss(out, y)
if args.train_mode:
sgd = paddle.optimizer.SGD(learning_rate=0.01)
opt = fleet.distributed_optimizer(sgd)
opt.minimize(loss)
return main_program, startup_program, [loss, relu_out1, relu_out2]
def get_dataloader(mode='train'):
transform = T.Compose(
[
T.Normalize(
mean=[127.5],
std=[127.5],
),
]
)
train_dataset = paddle.vision.datasets.MNIST(mode=mode, transform=transform)
sampler = paddle.io.DistributedBatchSampler(
train_dataset, shuffle=False, drop_last=True, batch_size=batch_size
)
train_loader = paddle.io.DataLoader(train_dataset, batch_sampler=sampler)
return train_loader
def train(args):
main_program, startup_program, fetch_list = get_program(args)
exe = paddle.static.Executor()
exe.run(startup_program)
losses = []
relu_out1_list = []
relu_out2_list = []
for x_data, y_data in get_dataloader():
loss, relu_out1, relu_out2 = exe.run(
main_program,
feed={'x': x_data, 'y': y_data},
fetch_list=fetch_list,
)
losses.append(loss)
relu_out1_list.append(relu_out1)
relu_out2_list.append(relu_out2)
losses = np.array(losses)
relu_out1_list = np.array(relu_out1_list)
relu_out2_list = np.array(relu_out2_list)
rank = paddle.distributed.get_rank()
np.savez(
os.path.join(
args.output_dir, 'train_{}_{}.npz'.format(rank, args.use_custom_op)
),
losses=losses,
relu_out1_list=relu_out1_list,
relu_out2_list=relu_out2_list,
)
if rank != 0:
model_path = os.path.join(args.model_dir, str(args.use_custom_op))
paddle.static.save(main_program, model_path)
def eval(args):
main_program, startup_program, fetch_list = get_program(args)
exe = paddle.static.Executor()
exe.run(startup_program)
model_path = os.path.join(args.model_dir, str(args.use_custom_op))
paddle.static.load(main_program, model_path, exe)
losses = []
relu_out1_list = []
relu_out2_list = []
for x_data, y_data in get_dataloader():
loss, relu_out1, relu_out2 = exe.run(
main_program,
feed={'x': x_data, 'y': y_data},
fetch_list=fetch_list,
)
losses.append(loss)
relu_out1_list.append(relu_out1)
relu_out2_list.append(relu_out2)
losses = np.array(losses)
relu_out1_list = np.array(relu_out1_list)
relu_out2_list = np.array(relu_out2_list)
rank = paddle.distributed.get_rank()
np.savez(
os.path.join(
args.output_dir, 'eval_{}_{}.npz'.format(rank, args.use_custom_op)
),
losses=losses,
relu_out1_list=relu_out1_list,
relu_out2_list=relu_out2_list,
)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--output_dir', type=str, required=True)
parser.add_argument('--model_dir', type=str, required=True)
parser.add_argument('--use_custom_op', action='store_true')
parser.add_argument('--train_mode', action='store_true')
args = parser.parse_args()
paddle.enable_static()
paddle.seed(0)
np.random.seed(0)
random.seed(0)
fleet.init()
if args.train_mode:
train(args)
else:
eval(args)
# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from utils import IS_MAC, extra_compile_args, paddle_includes
import paddle
from paddle.utils.cpp_extension import CppExtension, CUDAExtension, setup
if paddle.framework.core.is_compiled_with_xpu():
source_files = ['custom_relu_op_xpu.cc']
setup(
name='custom_setup_op_relu_model_static_multidevices',
ext_modules=CppExtension( # XPU don't support GPU
sources=['custom_relu_op_xpu.cc'],
include_dirs=paddle_includes,
extra_compile_args=extra_compile_args,
verbose=True,
),
)
else:
source_files = ['custom_relu_op.cc']
if not IS_MAC:
source_files.append('custom_relu_op.cu')
setup(
name='custom_setup_op_relu_model_static_multidevices',
ext_modules=CUDAExtension(
sources=source_files,
include_dirs=paddle_includes,
extra_compile_args=extra_compile_args,
verbose=True,
),
)
# Copyright (c) 2023 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import subprocess
import tempfile
import unittest
class TestCustomOpReluModelStaticMultiDevice(unittest.TestCase):
def install_custom_op(self):
cmds = [
"python",
"setup_for_static_multidevice_test.py",
"install",
]
p = subprocess.run(cmds)
assert p.returncode == 0, f"Install Custom Op: Failed: {p}"
def setUp(self):
self.fleet_log_dir = tempfile.TemporaryDirectory()
self.model_dir = tempfile.TemporaryDirectory()
self.output_log_dir = tempfile.TemporaryDirectory()
self.install_custom_op()
def train(self, use_custom_op: bool = True):
cmds = [
"python",
"-m",
"paddle.distributed.launch",
]
cmds += ["--log_dir", self.fleet_log_dir.name]
cmds += ["custom_op_multidevice_model_train.py"]
cmds += ["--output_dir", self.output_log_dir.name]
cmds += ["--model_dir", self.model_dir.name]
if use_custom_op:
cmds += ["--use_custom_op"]
cmds += ["--train_mode"]
p = subprocess.run(cmds)
assert p.returncode == 0, f"Fleet train: Failed: {p}"
def eval(self, use_custom_op: bool = True):
cmds = [
"python",
"-m",
"paddle.distributed.launch",
]
cmds += ["--log_dir", self.fleet_log_dir.name]
cmds += ["custom_op_multidevice_model_train.py"]
cmds += ["--output_dir", self.output_log_dir.name]
cmds += ["--model_dir", self.model_dir.name]
if use_custom_op:
cmds += ["--use_custom_op"]
p = subprocess.run(cmds)
assert p.returncode == 0, f"Fleet eval: Failed: {p}"
def tearDown(self):
self.fleet_log_dir.cleanup()
self.model_dir.cleanup()
self.output_log_dir.cleanup()
def test_train_and_eval(self):
self.train(use_custom_op=True)
self.train(use_custom_op=False)
import numpy as np
import paddle
count = 0
if paddle.framework.core.is_compiled_with_cuda():
count = paddle.framework.core.get_cuda_device_count()
elif paddle.framework.core.is_compiled_with_xpu():
count = paddle.framework.core.get_xpu_device_count()
assert (
count > 1
), "TestCustomOpReluModelStaticMultiDevice needs at least two devices"
for id in range(count):
loss_custom = np.load(
os.path.join(
self.output_log_dir.name, 'train_{}_{}.npz'.format(id, True)
)
)
loss_origin = np.load(
os.path.join(
self.output_log_dir.name,
'train_{}_{}.npz'.format(id, False),
)
)
np.testing.assert_array_equal(
loss_custom['losses'], loss_origin['losses']
)
np.testing.assert_array_equal(
loss_custom['relu_out1_list'], loss_origin['relu_out1_list']
)
np.testing.assert_array_equal(
loss_custom['relu_out2_list'], loss_origin['relu_out2_list']
)
self.eval(use_custom_op=True)
self.eval(use_custom_op=False)
for id in range(count):
loss_custom = np.load(
os.path.join(
self.output_log_dir.name, 'eval_{}_{}.npz'.format(id, True)
)
)
loss_origin = np.load(
os.path.join(
self.output_log_dir.name, 'eval_{}_{}.npz'.format(id, False)
)
)
np.testing.assert_array_equal(
loss_custom['losses'], loss_origin['losses']
)
np.testing.assert_array_equal(
loss_custom['relu_out1_list'], loss_origin['relu_out1_list']
)
np.testing.assert_array_equal(
loss_custom['relu_out2_list'], loss_origin['relu_out2_list']
)
if __name__ == '__main__':
unittest.main()
......@@ -213,7 +213,7 @@ class TestStaticModel(unittest.TestCase):
self.temp_dir = tempfile.TemporaryDirectory()
self.model_save_dir = os.path.join(self.temp_dir.name, 'infer_model')
self.model_path_template = os.path.join(
self.model_save_dir, 'custom_relu_static_model_{}_{}'
self.model_save_dir, 'custom_relu_static_model_{}'
)
paddle.enable_static()
......@@ -229,42 +229,22 @@ class TestStaticModel(unittest.TestCase):
device, use_custom_op=False
)
custom_relu_train_out = self.train_model(device, use_custom_op=True)
# using PE
original_relu_train_pe_out = self.train_model(
device, use_custom_op=False, use_pe=True
)
custom_relu_train_pe_out = self.train_model(
device, use_custom_op=True, use_pe=True
)
np.testing.assert_array_equal(
original_relu_train_out, custom_relu_train_out
)
np.testing.assert_array_equal(
original_relu_train_pe_out, custom_relu_train_pe_out
)
# for eval
original_relu_eval_out = self.eval_model(
device, use_custom_op=False
)
custom_relu_eval_out = self.eval_model(device, use_custom_op=True)
# using PE
original_relu_eval_pe_out = self.eval_model(
device, use_custom_op=False, use_pe=True
)
custom_relu_eval_pe_out = self.eval_model(
device, use_custom_op=True, use_pe=True
)
np.testing.assert_array_equal(
original_relu_eval_out, custom_relu_eval_out
)
np.testing.assert_array_equal(
original_relu_eval_pe_out, custom_relu_eval_pe_out
)
def train_model(self, device, use_custom_op=False, use_pe=False):
def train_model(self, device, use_custom_op=False):
# reset random seed
paddle.seed(self.seed)
np.random.seed(self.seed)
......@@ -292,18 +272,7 @@ class TestStaticModel(unittest.TestCase):
exe = exe = paddle.static.Executor()
exe.run(paddle.static.default_startup_program())
# For PE
if use_pe:
places = (
paddle.static.cpu_places()
if device == 'cpu'
else paddle.static.cuda_places()
)
main_program = paddle.static.CompiledProgram(
paddle.static.default_main_program()
).with_data_parallel(loss_name=loss.name, places=places)
else:
main_program = paddle.static.default_main_program()
main_program = paddle.static.default_main_program()
for batch_id in range(self.batch_num):
x_data = self.datas[batch_id]
......@@ -317,7 +286,7 @@ class TestStaticModel(unittest.TestCase):
# save model
paddle.static.save_inference_model(
self.model_path_template.format(use_custom_op, use_pe),
self.model_path_template.format(use_custom_op),
[x],
[out],
exe,
......@@ -325,7 +294,7 @@ class TestStaticModel(unittest.TestCase):
return res[0]
def eval_model(self, device, use_custom_op=False, use_pe=False):
def eval_model(self, device, use_custom_op=False):
paddle.set_device(device)
with paddle.static.scope_guard(paddle.static.Scope()):
......@@ -337,7 +306,7 @@ class TestStaticModel(unittest.TestCase):
feed_target_names,
fetch_targets,
] = paddle.static.load_inference_model(
self.model_path_template.format(use_custom_op, use_pe), exe
self.model_path_template.format(use_custom_op), exe
)
x_data = self.datas[0]
......
......@@ -68,33 +68,6 @@ def custom_relu_static(
return out_v
def custom_relu_static_pe(func, device, dtype, np_x, use_func=True):
paddle.enable_static()
paddle.set_device(device)
places = static.cpu_places() if device == 'cpu' else static.cuda_places()
with static.scope_guard(static.Scope()):
with static.program_guard(static.Program()):
x = static.data(name='X', shape=[None, 8], dtype=dtype)
x.stop_gradient = False
out = func(x) if use_func else paddle.nn.functional.relu(x)
static.append_backward(out)
exe = static.Executor()
exe.run(static.default_startup_program())
# in static graph mode, x data has been covered by out
compiled_prog = static.CompiledProgram(
static.default_main_program()
).with_data_parallel(loss_name=out.name, places=places)
out_v = exe.run(
compiled_prog, feed={'X': np_x}, fetch_list=[out.name]
)
paddle.disable_static()
return out_v
def custom_relu_static_inference(func, device, np_data, np_label, path_prefix):
paddle.set_device(device)
......@@ -240,25 +213,6 @@ class TestNewCustomOpSetUpInstall(unittest.TestCase):
),
)
def test_static_pe(self):
for device in self.devices:
for dtype in self.dtypes:
if device == 'cpu' and dtype == 'float16':
continue
x = np.random.uniform(-1, 1, [4, 8]).astype(dtype)
for custom_op in self.custom_ops:
out = custom_relu_static_pe(custom_op, device, dtype, x)
pd_out = custom_relu_static_pe(
custom_op, device, dtype, x, False
)
np.testing.assert_array_equal(
out,
pd_out,
err_msg='custom op out: {},\n paddle api out: {}'.format(
out, pd_out
),
)
def test_dynamic(self):
for device in self.devices:
for dtype in self.dtypes:
......
......@@ -70,33 +70,6 @@ def custom_relu_static(
return out_v
def custom_relu_static_pe(func, device, dtype, np_x, use_func=True):
paddle.enable_static()
paddle.set_device(device)
places = static.xpu_places()
with static.scope_guard(static.Scope()):
with static.program_guard(static.Program()):
x = static.data(name='X', shape=[None, 8], dtype=dtype)
x.stop_gradient = False
out = func(x) if use_func else paddle.nn.functional.relu(x)
static.append_backward(out)
exe = static.Executor()
exe.run(static.default_startup_program())
# in static graph mode, x data has been covered by out
compiled_prog = static.CompiledProgram(
static.default_main_program()
).with_data_parallel(loss_name=out.name, places=places)
out_v = exe.run(
compiled_prog, feed={'X': np_x}, fetch_list=[out.name]
)
paddle.disable_static()
return out_v
def custom_relu_static_inference(func, device, np_data, np_label, path_prefix):
paddle.set_device(device)
......@@ -218,22 +191,6 @@ class TestNewCustomOpXpuSetUpInstall(unittest.TestCase):
),
)
def test_static_pe(self):
for dtype in self.dtypes:
x = np.random.uniform(-1, 1, [4, 8]).astype(dtype)
out = custom_relu_static_pe(self.custom_op, self.device, dtype, x)
pd_out = custom_relu_static_pe(
self.custom_op, self.device, dtype, x, False
)
np.testing.assert_allclose(
out,
pd_out,
atol=1e-2,
err_msg='custom op out: {},\n paddle api out: {}'.format(
out, pd_out
),
)
def test_dynamic(self):
for dtype in self.dtypes:
x = np.random.uniform(-1, 1, [4, 8]).astype(dtype)
......
......@@ -69,37 +69,6 @@ def custom_relu_static(func, device, dtype, np_x, use_func=True):
return out_v
def custom_relu_static_pe(func, device, dtype, np_x, use_func=True):
import paddle
import paddle.static as static
paddle.enable_static()
paddle.set_device(device)
places = paddle.CustomPlace("custom_cpu", 0)
with static.scope_guard(static.Scope()):
with static.program_guard(static.Program()):
x = static.data(name="X", shape=[None, 8], dtype=dtype)
x.stop_gradient = False
out = func(x) if use_func else paddle.nn.functional.relu(x)
static.append_backward(out)
exe = static.Executor()
exe.run(static.default_startup_program())
# in static mode, x data has been covered by out
compiled_prog = static.CompiledProgram(
static.default_main_program()
).with_data_parallel(loss_name=out.name, places=places)
out_v = exe.run(
compiled_prog, feed={"X": np_x}, fetch_list=[out.name]
)
paddle.disable_static()
return out_v
def custom_relu_double_grad_dynamic(func, device, dtype, np_x, use_func=True):
import paddle
......@@ -200,7 +169,6 @@ class TestNewCustomOpSetUpInstall(unittest.TestCase):
def test_custom_device(self):
self._test_static()
self._test_static_pe()
self._test_dynamic()
self._test_double_grad_dynamic()
self._test_with_dataloader()
......@@ -221,21 +189,6 @@ class TestNewCustomOpSetUpInstall(unittest.TestCase):
),
)
def _test_static_pe(self):
for dtype in self.dtypes:
x = np.random.uniform(-1, 1, [4, 8]).astype(dtype)
out = custom_relu_static_pe(self.custom_op, self.device, dtype, x)
pd_out = custom_relu_static_pe(
self.custom_op, self.device, dtype, x, False
)
np.testing.assert_array_equal(
out,
pd_out,
err_msg="custom op out: {},\n paddle api out: {}".format(
out, pd_out
),
)
def _test_dynamic(self):
for dtype in self.dtypes:
x = np.random.uniform(-1, 1, [4, 8]).astype(dtype)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册