未验证 提交 7a156f18 编写于 作者: K kangguangli 提交者: GitHub

[with_data_parallel][part2] remove with_data_parallel in unit test (#50501)

* process unit test matched test_p*

* fix ci bug

* fix ci bugs

* fix codestyle

* remove all tests about pe and restore some irrelated tests

* delete test_parallel_executor_test_while_train.py

* assert len(self.places) == 1

* remove with_data_parallel in unittest

* remove multi-card related checks

* remove with_data_parallel in test_multiprocess_dataloader*

* fix error hint

* fix typo
上级 9b0b155b
......@@ -324,6 +324,11 @@ class CompiledProgram:
if self._places is not None:
if not isinstance(self._places, (list, tuple)):
self._places = [self._places]
if self._places is not None and len(self._places) > 1:
raise NotImplementedError(
"If you need to train with multi-gpus, please use `fleet` instead of `with_data_parallel`."
"This will be removed soon in develop version."
)
return self
......
......@@ -98,10 +98,8 @@ class InplaceTestBase(unittest.TestCase):
build_strategy.fuse_all_optimizer_ops = (
self.fuse_all_optimizer_ops
)
compiled_prog = fluid.CompiledProgram(prog).with_data_parallel(
loss_name=loss.name,
build_strategy=build_strategy,
places=self.place,
compiled_prog = fluid.CompiledProgram(
prog, build_strategy=build_strategy
)
compiled_programs.append(compiled_prog)
......@@ -133,74 +131,12 @@ class InplaceTestBase(unittest.TestCase):
),
)
def check_multi_card_fetch_var(self):
if self.is_invalid_test():
return
prog1, scope1, exe, loss1 = self.build_program_and_scope()
scopes = []
compiled_programs = []
if self.use_cuda:
places = fluid.cuda_places()
else:
places = fluid.cpu_places(self.device_count)
for memory_optimize in [False, True]:
for enable_inplace in [False, True]:
prog, scope, _, loss = self.build_program_and_scope()
scopes.append(scope)
build_strategy = fluid.BuildStrategy()
build_strategy.memory_optimize = memory_optimize
build_strategy.enable_inplace = enable_inplace
build_strategy.fuse_all_optimizer_ops = (
self.fuse_all_optimizer_ops
)
compiled_program = fluid.CompiledProgram(
prog
).with_data_parallel(
loss_name=loss.name,
build_strategy=build_strategy,
places=places,
)
compiled_programs.append(compiled_program)
repeated_var_names = self.get_all_vars(prog1)
random.shuffle(repeated_var_names) # add some random
for fetch_var in repeated_var_names[:4]:
for _ in range(2):
fetch_vals = []
for scope, compiled_prog in zip(scopes, compiled_programs):
with fluid.scope_guard(scope):
(fetch_val,) = exe.run(
compiled_prog,
feed=feed_dict,
fetch_list=[fetch_var],
)
fetch_vals.append(fetch_val)
for item in fetch_vals:
np.testing.assert_array_equal(fetch_vals[0], item)
np.testing.assert_array_equal(
fetch_vals[0],
item,
err_msg='error var name: {}, fetch_vals[0]: {}, item: {}'.format(
fetch_var,
fetch_vals[0][~np.equal(fetch_vals[0], item)],
item[~np.equal(fetch_vals[0], item)],
),
)
class CUDAInplaceTest(InplaceTestBase):
def initParameter(self):
self.use_cuda = True
self.fuse_all_optimizer_ops = False
def test_multi_card_fetch_var(self):
self.check_multi_card_fetch_var()
def test_single_card_fetch_var(self):
self.check_single_card_fetch_var()
......@@ -210,9 +146,6 @@ class CPUInplaceTest(InplaceTestBase):
self.use_cuda = False
self.fuse_all_optimizer_ops = False
def test_multi_card_fetch_var(self):
self.check_multi_card_fetch_var()
def test_single_card_fetch_var(self):
self.check_single_card_fetch_var()
......
......@@ -23,9 +23,6 @@ class CUDAInplaceTestWithFuseOptimizationOps(InplaceTestBase):
self.fuse_all_optimizer_ops = True
self.fuse_all_reduce_ops = False
def test_multi_card_fetch_var(self):
self.check_multi_card_fetch_var()
def test_single_card_fetch_var(self):
self.check_single_card_fetch_var()
......@@ -36,9 +33,6 @@ class CPUInplaceTestWithFuseOptimizationOps(InplaceTestBase):
self.fuse_all_optimizer_ops = True
self.fuse_all_reduce_ops = False
def test_multi_card_fetch_var(self):
self.check_multi_card_fetch_var()
# TODO(zcd): should check why this test failed.
@unittest.skip("should fix this later.")
def test_single_card_fetch_var(self):
......
......@@ -81,23 +81,19 @@ class DataLoaderKeepOrderTestBase(unittest.TestCase):
start_val += 1
def get_places(self):
place_list = [fluid.cpu_places(1), fluid.cpu_places(4)]
place_list = [fluid.cpu_places(1)]
if fluid.is_compiled_with_cuda():
if os.name == "nt":
place_list.extend([fluid.cuda_places(0)])
else:
place_list.extend(
[fluid.cuda_places(0), fluid.cuda_places([0, 1])]
)
place_list.extend([fluid.cuda_places(0)])
return place_list
def test_main(self):
for p in self.get_places():
use_compiled_program_list = [True] if len(p) > 1 else [False, True]
for use_compiled_program in use_compiled_program_list:
self.run_main_with_place(p, use_compiled_program)
self.run_main_with_place(p)
def run_main_with_place(self, places, use_compiled_program=True):
def run_main_with_place(self, places):
with fluid.scope_guard(fluid.Scope()):
with fluid.program_guard(fluid.Program(), fluid.Program()):
input_data, loss, loader = self.build_network(places)
......@@ -107,14 +103,9 @@ class DataLoaderKeepOrderTestBase(unittest.TestCase):
exe.run(fluid.default_startup_program())
dev_cnt = len(places)
if dev_cnt > 1:
self.assertTrue(use_compiled_program)
self.assertTrue(dev_cnt == 1)
main_program = fluid.default_main_program()
if use_compiled_program:
main_program = fluid.CompiledProgram(
main_program
).with_data_parallel(loss_name=loss.name, places=places)
max_batch_num = min(
self.break_num, int(self.batch_num / dev_cnt)
......
......@@ -101,23 +101,19 @@ class DataLoaderKeepOrderTestBase(unittest.TestCase):
start_val += 1
def get_places(self):
place_list = [fluid.cpu_places(1), fluid.cpu_places(4)]
place_list = [fluid.cpu_places(1)]
if fluid.is_compiled_with_cuda():
if os.name == "nt":
place_list.extend([fluid.cuda_places(0)])
else:
place_list.extend(
[fluid.cuda_places(0), fluid.cuda_places([0, 1])]
)
place_list.extend([fluid.cuda_places(0)])
return place_list
def test_main(self):
for p in self.get_places():
use_compiled_program_list = [True] if len(p) > 1 else [False, True]
for use_compiled_program in use_compiled_program_list:
self.run_main_with_place(p, use_compiled_program)
self.run_main_with_place(p)
def run_main_with_place(self, places, use_compiled_program=True):
def run_main_with_place(self, places):
with fluid.scope_guard(fluid.Scope()):
with fluid.program_guard(fluid.Program(), fluid.Program()):
input_data, loss, loader = self.build_network(places)
......@@ -127,14 +123,9 @@ class DataLoaderKeepOrderTestBase(unittest.TestCase):
exe.run(fluid.default_startup_program())
dev_cnt = len(places)
if dev_cnt > 1:
self.assertTrue(use_compiled_program)
self.assertTrue(dev_cnt == 1)
main_program = fluid.default_main_program()
if use_compiled_program:
main_program = fluid.CompiledProgram(
main_program
).with_data_parallel(loss_name=loss.name, places=places)
max_batch_num = min(
self.break_num, int(self.batch_num / dev_cnt)
......
......@@ -86,7 +86,6 @@ class TestBase(unittest.TestCase):
def run_main(
self,
use_legacy_py_reader,
with_data_parallel,
places,
use_double_buffer,
):
......@@ -108,10 +107,6 @@ class TestBase(unittest.TestCase):
exe.run(startup_prog)
prog = fluid.CompiledProgram(main_prog)
if with_data_parallel:
prog = prog.with_data_parallel(
loss_name=loss.name, places=places
)
step = 0
step_list = []
......@@ -163,39 +158,33 @@ class TestBase(unittest.TestCase):
}
return ret
def prepare_places(self, with_data_parallel, with_cpu=True, with_gpu=True):
def prepare_places(self, with_cpu=True, with_gpu=True):
places = []
if with_cpu:
places.append([fluid.CPUPlace()])
if with_data_parallel:
places.append([fluid.CPUPlace()] * 2)
if with_gpu and fluid.core.is_compiled_with_cuda():
tmp = fluid.cuda_places()
assert len(tmp) > 0, "no gpu detected"
if with_data_parallel:
places.append(tmp)
places.append([tmp[0]])
return places
def test_main(self):
for with_data_parallel in [True, False]:
for p in self.prepare_places(with_data_parallel):
for use_double_buffer in [False, True]:
results = []
for use_legacy_py_reader in [False, True]:
ret = self.run_main(
use_legacy_py_reader=use_legacy_py_reader,
with_data_parallel=with_data_parallel,
places=p,
use_double_buffer=use_double_buffer,
)
results.append(ret)
if not use_double_buffer:
diff = np.max(
np.abs(results[0]['loss'] - results[1]['loss'])
)
self.assertLess(diff, 1e-3)
for p in self.prepare_places():
for use_double_buffer in [False, True]:
results = []
for use_legacy_py_reader in [False, True]:
ret = self.run_main(
use_legacy_py_reader=use_legacy_py_reader,
places=p,
use_double_buffer=use_double_buffer,
)
results.append(ret)
if not use_double_buffer:
diff = np.max(
np.abs(results[0]['loss'] - results[1]['loss'])
)
self.assertLess(diff, 1e-3)
if __name__ == '__main__':
......
......@@ -87,7 +87,6 @@ class TestBase(unittest.TestCase):
def run_main(
self,
use_legacy_py_reader,
with_data_parallel,
places,
use_double_buffer,
):
......@@ -109,10 +108,6 @@ class TestBase(unittest.TestCase):
exe.run(startup_prog)
prog = fluid.CompiledProgram(main_prog)
if with_data_parallel:
prog = prog.with_data_parallel(
loss_name=loss.name, places=places
)
step = 0
step_list = []
......@@ -166,40 +161,34 @@ class TestBase(unittest.TestCase):
}
return ret
def prepare_places(self, with_data_parallel, with_cpu=True, with_gpu=True):
def prepare_places(self, with_cpu=True, with_gpu=True):
places = []
if with_cpu:
places.append([fluid.CPUPlace()])
if with_data_parallel:
places.append([fluid.CPUPlace()] * 2)
if with_gpu and fluid.core.is_compiled_with_cuda():
tmp = fluid.cuda_places()
assert len(tmp) > 0, "no gpu detected"
if with_data_parallel:
places.append(tmp)
places.append([tmp[0]])
return places
def test_main(self):
for with_data_parallel in [True, False]:
for p in self.prepare_places(with_data_parallel):
for use_double_buffer in [False, True]:
results = []
for use_legacy_py_reader in [False, True]:
print(p, use_double_buffer, use_legacy_py_reader)
ret = self.run_main(
use_legacy_py_reader=use_legacy_py_reader,
with_data_parallel=with_data_parallel,
places=p,
use_double_buffer=use_double_buffer,
)
results.append(ret)
if not use_double_buffer:
diff = np.max(
np.abs(results[0]['loss'] - results[1]['loss'])
)
self.assertLess(diff, 1e-3)
for p in self.prepare_places():
for use_double_buffer in [False, True]:
results = []
for use_legacy_py_reader in [False, True]:
print(p, use_double_buffer, use_legacy_py_reader)
ret = self.run_main(
use_legacy_py_reader=use_legacy_py_reader,
places=p,
use_double_buffer=use_double_buffer,
)
results.append(ret)
if not use_double_buffer:
diff = np.max(
np.abs(results[0]['loss'] - results[1]['loss'])
)
self.assertLess(diff, 1e-3)
class TestDataLoaderBaseAbstract(unittest.TestCase):
......
......@@ -18,7 +18,6 @@ import unittest
import numpy as np
import paddle
import paddle.fluid as fluid
import paddle.fluid.core as core
......@@ -508,54 +507,28 @@ class TestLRScheduler(unittest.TestCase):
num += 1
if isinstance(place, paddle.CPUPlace):
compiled_train_prog = paddle.static.CompiledProgram(
main_prog
).with_data_parallel(
loss_name=loss.name, places=fluid.cpu_places(4)
)
compiled_train_prog = main_prog
for epoch in range(5):
python_result = python_func(num, **kwarg)
for batch_id in range(2):
_ = exe.run(
out = exe.run(
compiled_train_prog,
feed={'x': np.random.randn(12, 4, 5).astype('float32')},
feed={'x': np.random.randn(3, 4, 5).astype('float32')},
fetch_list=lr_var.name,
)
scopes = compiled_train_prog._executor.local_scopes()
out = np.array(scopes[0].var(lr_var.name).get_tensor())
self.assertEqual(out, np.array(python_result))
out = np.array(scopes[1].var(lr_var.name).get_tensor())
self.assertEqual(out, np.array(python_result))
out = np.array(scopes[2].var(lr_var.name).get_tensor())
self.assertEqual(out, np.array(python_result))
out = np.array(scopes[3].var(lr_var.name).get_tensor())
self.assertEqual(out, np.array(python_result))
scheduler.step()
num += 1
compiled_test_prog = paddle.static.CompiledProgram(
test_prog
).with_data_parallel(
loss_name=loss.name,
share_vars_from=compiled_train_prog,
places=fluid.cpu_places(4),
)
compiled_test_prog = test_prog
for epoch in range(5):
python_result = python_func(num, **kwarg)
for batch_id in range(2):
_ = exe.run(
out = exe.run(
compiled_test_prog,
feed={'x': np.random.randn(12, 4, 5).astype('float32')},
feed={'x': np.random.randn(3, 4, 5).astype('float32')},
fetch_list=lr_var.name,
)
scopes = compiled_test_prog._executor.local_scopes()
out = np.array(scopes[0].var(lr_var.name).get_tensor())
self.assertEqual(out, np.array(python_result))
out = np.array(scopes[1].var(lr_var.name).get_tensor())
self.assertEqual(out, np.array(python_result))
out = np.array(scopes[2].var(lr_var.name).get_tensor())
self.assertEqual(out, np.array(python_result))
out = np.array(scopes[3].var(lr_var.name).get_tensor())
self.assertEqual(out, np.array(python_result))
scheduler.step()
num += 1
......
......@@ -122,7 +122,7 @@ class TestDygraphDataLoader(unittest.TestCase):
def test_main(self):
# dynamic graph do not run with_data_parallel
for p in prepare_places(False):
for p in prepare_places():
for persistent_workers in [False, True]:
results = []
for num_workers in [0, 2]:
......
......@@ -122,7 +122,7 @@ class TestDygraphDataLoader(unittest.TestCase):
def test_main(self):
# dynamic graph do not run with_data_parallel
for p in prepare_places(False):
for p in prepare_places():
for persistent_workers in [False, True]:
results = []
for num_workers in [0, 2]:
......
......@@ -93,18 +93,14 @@ def simple_fc_net_static():
return startup_prog, main_prog, image, label, loss
def prepare_places(with_data_parallel, with_cpu=False, with_gpu=True):
def prepare_places(with_cpu=False, with_gpu=True):
places = []
if with_cpu:
places.append([fluid.CPUPlace()])
if with_data_parallel:
places.append([fluid.CPUPlace()] * 2)
if with_gpu and fluid.core.is_compiled_with_cuda():
tmp = fluid.cuda_places()[:2]
assert len(tmp) > 0, "no gpu detected"
if with_data_parallel and len(tmp) > 1:
places.append(tmp)
places.append([tmp[0]])
return places
......@@ -132,10 +128,6 @@ class TestStaticDataLoader(unittest.TestCase):
exe.run(startup_prog)
prog = fluid.CompiledProgram(main_prog)
if len(places) > 1:
prog = prog.with_data_parallel(
loss_name=loss.name, places=places
)
step_list = []
loss_list = []
......@@ -173,7 +165,7 @@ class TestStaticDataLoader(unittest.TestCase):
return ret
def test_main(self):
for p in prepare_places(True):
for p in prepare_places():
for persistent_workers in [False, True]:
results = []
for num_workers in [0, 2]:
......@@ -237,11 +229,7 @@ class TestStaticDataLoaderWithBatchedDataset(TestStaticDataLoader):
exe = fluid.Executor(place=places[0])
exe.run(startup_prog)
prog = fluid.CompiledProgram(main_prog)
if len(places) > 1:
prog = prog.with_data_parallel(
loss_name=loss.name, places=places
)
prog = main_prog
step_list = []
loss_list = []
......
......@@ -93,24 +93,20 @@ def simple_fc_net_static():
return startup_prog, main_prog, image, label, loss
def prepare_places(with_data_parallel, with_cpu=False, with_gpu=True):
def prepare_places(with_cpu=False, with_gpu=True):
places = []
if with_cpu:
places.append([fluid.CPUPlace()])
if with_data_parallel:
places.append([fluid.CPUPlace()] * 2)
if with_gpu and fluid.core.is_compiled_with_cuda():
tmp = fluid.cuda_places()[:2]
assert len(tmp) > 0, "no gpu detected"
if with_data_parallel and len(tmp) > 1:
places.append(tmp)
places.append([tmp[0]])
return places
class TestStaticDataLoader(unittest.TestCase):
def run_main(self, num_workers, places, persistent_workers, use_pe=True):
def run_main(self, num_workers, places, persistent_workers):
scope = fluid.Scope()
with fluid.scope_guard(scope):
startup_prog, main_prog, image, label, loss = simple_fc_net_static()
......@@ -131,14 +127,7 @@ class TestStaticDataLoader(unittest.TestCase):
exe = fluid.Executor(place=places[0])
exe.run(startup_prog)
if use_pe:
prog = fluid.CompiledProgram(main_prog)
if len(places) > 1:
prog = prog.with_data_parallel(
loss_name=loss.name, places=places
)
else:
prog = main_prog
prog = main_prog
step_list = []
loss_list = []
......@@ -176,7 +165,7 @@ class TestStaticDataLoader(unittest.TestCase):
return ret
def test_main(self):
for p in prepare_places(True):
for p in prepare_places():
for persistent_workers in [True, False]:
results = []
for num_workers in [0, 2]:
......@@ -300,10 +289,6 @@ class TestStaticDataLoaderWithBatchedDataset(TestStaticDataLoader):
exe.run(startup_prog)
prog = fluid.CompiledProgram(main_prog)
if len(places) > 1:
prog = prog.with_data_parallel(
loss_name=loss.name, places=places
)
step_list = []
loss_list = []
......
......@@ -156,28 +156,10 @@ class TestUnaryAPI(unittest.TestCase):
self.assertEqual(item.shape, ())
# 2) Test CompiledProgram Program
if paddle.device.is_compiled_with_cuda():
places = [paddle.CUDAPlace(0)]
expect_shape = ()
else:
places = [paddle.CPUPlace()] * 4
expect_shape = (4,)
compile_prog = paddle.static.CompiledProgram(
main_prog
).with_data_parallel(loss.name, places=places)
# return_merged=False #
res = exe.run(
compile_prog, fetch_list=fetch_list, return_merged=False
)
for item1 in res:
for item2 in item1:
self.assertEqual(item2.shape, ())
# return_merged=True #
res = exe.run(
compile_prog, fetch_list=fetch_list, return_merged=True
)
expect_shape = ()
compile_prog = paddle.static.CompiledProgram(main_prog)
res = exe.run(compile_prog, fetch_list=fetch_list)
for item in res:
self.assertEqual(item.shape, expect_shape)
......@@ -3296,28 +3278,10 @@ class TestUnaryElementwiseAPIWithComplexInput(unittest.TestCase):
self.assertEqual(item.shape, ())
# 2) Test CompiledProgram Program
if paddle.device.is_compiled_with_cuda():
places = [paddle.CUDAPlace(0)]
expect_shape = ()
else:
places = [paddle.CPUPlace()] * 4
expect_shape = (4,)
compile_prog = paddle.static.CompiledProgram(
main_prog
).with_data_parallel(loss.name, places=places)
# return_merged=False #
res = exe.run(
compile_prog, fetch_list=fetch_list, return_merged=False
)
for item1 in res:
for item2 in item1:
self.assertEqual(item2.shape, ())
# return_merged=True #
res = exe.run(
compile_prog, fetch_list=fetch_list, return_merged=True
)
expect_shape = ()
compile_prog = paddle.static.CompiledProgram(main_prog)
res = exe.run(compile_prog, fetch_list=fetch_list)
for item in res:
self.assertEqual(item.shape, expect_shape)
......@@ -3380,28 +3344,11 @@ class TestAsReal(unittest.TestCase):
self.assertEqual(res[3].shape, (2,))
# 2) Test CompiledProgram Program
if paddle.device.is_compiled_with_cuda():
places = [paddle.CUDAPlace(0)]
expect_shapes = (), (2,), (), (2,)
else:
places = [paddle.CPUPlace()] * 4
expect_shapes = (4,), (8,), (4,), (8,)
compile_prog = paddle.static.CompiledProgram(
main_prog
).with_data_parallel(loss.name, places=places)
# return_merged=False #
res = exe.run(
compile_prog, fetch_list=fetch_list, return_merged=False
)
for out_i, expect in zip(res, [(), (2,), (), (2,)]):
for replica in out_i:
self.assertEqual(replica.shape, expect)
# return_merged=True #
res = exe.run(
compile_prog, fetch_list=fetch_list, return_merged=True
)
expect_shapes = (), (2,), (), (2,)
compile_prog = paddle.static.CompiledProgram(main_prog)
res = exe.run(compile_prog, fetch_list=fetch_list)
print(res)
for actual, expect in zip(res, expect_shapes):
self.assertEqual(actual.shape, expect)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册