diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index ec8b19c7ba07a9e57a32277ff3fc34b0ea25a819..e86af8b7ed94028ac077e4267645b8af0a1a5dfe 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -107,7 +107,7 @@ if(WITH_DISTRIBUTE) endif() py_test_modules(test_parallel_executor_crf MODULES test_parallel_executor_crf SERIAL) py_test_modules(test_parallel_executor_fetch_feed MODULES test_parallel_executor_fetch_feed SERIAL) -set_tests_properties(test_parallel_executor_fetch_feed PROPERTIES TIMEOUT 150) +set_tests_properties(test_parallel_executor_fetch_feed PROPERTIES TIMEOUT 450) py_test_modules(test_parallel_executor_transformer MODULES test_parallel_executor_transformer SERIAL) if(NOT APPLE) py_test_modules(test_image_classification_resnet MODULES test_image_classification_resnet SERIAL) diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py index a49c5d9b43ae1bffa7cb57764db497f68030b151..06da1632f24cd4376356ff8e53169c7221b20b71 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py @@ -14,13 +14,11 @@ from __future__ import print_function -import paddle.dataset.flowers as flowers import math import paddle.fluid as fluid import paddle.fluid.core as core import unittest import numpy as np -import paddle import os @@ -38,101 +36,82 @@ def Lenet(data, class_dim): return fc2 -class TestFetchOp(unittest.TestCase): - def parallel_exe(self, train_inputs, seed, use_cuda): - main = fluid.Program() +class TestFetchAndFeed(unittest.TestCase): + def parallel_exe(self, use_cuda, run_parallel_exe, seed=1): + main_program = fluid.Program() startup = fluid.Program() startup.random_seed = seed - with fluid.program_guard(main, startup): + with fluid.program_guard(main_program, startup): data = fluid.layers.data( name='image', shape=[3, 224, 224], dtype='float32') label = fluid.layers.data(name='label', shape=[1], dtype='int64') out = Lenet(data, class_dim=102) loss = fluid.layers.cross_entropy(input=out, label=label) loss = fluid.layers.mean(loss) - opt = fluid.optimizer.Momentum( learning_rate=0.1, momentum=0.9, regularization=fluid.regularizer.L2Decay(1e-4)) - opt.minimize(loss) - # TODO(zcd): I found that onece the memory optimizer is open, - # parallel_exe doesn't fetch some variable, such as conv2d_0.b_0@GRAD, - # conv2d_1.b_0@GRAD. Those variables should not be pruned. - # fluid.memory_optimize(main) - - place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - exe = fluid.Executor(place) - exe.run(startup) - - feeder = fluid.DataFeeder(place=place, feed_list=[data, label]) - pe = fluid.ParallelExecutor( - use_cuda=use_cuda, loss_name=loss.name, main_program=main) - - fetch_list = [] - all_vars = main.global_block().vars - for k, v in all_vars.items(): - if 'tmp' not in k and k[0] is not '_' or v.persistable: - fetch_list.append(k) - - for data in train_inputs: - ret = pe.run(fetch_list, - feed=feeder.feed(data), - return_numpy=True) - for i in range(len(fetch_list)): - assert not math.isnan(np.sum(ret[i])) and \ - not math.isinf(np.sum(ret[i])) - - @unittest.skip(reason="CI timeout") - def test_fetch_op(self): - tst_reader = paddle.batch(flowers.test(use_xmap=False), batch_size=16) - tst_reader_iter = tst_reader() - - iters = 3 - train_inputs = [] - for i in range(iters): - train_inputs.append(next(tst_reader_iter)) - - os.environ['CPU_NUM'] = str(4) - if core.is_compiled_with_cuda(): - self.parallel_exe(train_inputs, seed=1, use_cuda=True) - self.parallel_exe(train_inputs, seed=1, use_cuda=False) - - -class TestFeedParallel(unittest.TestCase): - def parallel_exe(self, use_cuda, seed): - main = fluid.Program() - startup = fluid.Program() - startup.random_seed = seed - with fluid.scope_guard(fluid.core.Scope()): - with fluid.program_guard(main, startup): - data = fluid.layers.data( - name='image', shape=[3, 224, 224], dtype='float32') - label = fluid.layers.data( - name='label', shape=[1], dtype='int64') - out = Lenet(data, class_dim=102) - loss = fluid.layers.cross_entropy(input=out, label=label) - loss = fluid.layers.mean(loss) - opt = fluid.optimizer.Momentum( - learning_rate=0.1, - momentum=0.9, - regularization=fluid.regularizer.L2Decay(1e-4)) - - opt.minimize(loss) - place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - feeder = fluid.DataFeeder(place=place, feed_list=[data, label]) - reader = feeder.decorate_reader( - paddle.batch( - flowers.train(), batch_size=16), multi_devices=True) - exe = fluid.Executor(place) exe.run(startup) pe = fluid.ParallelExecutor( - use_cuda=use_cuda, loss_name=loss.name, main_program=main) + use_cuda=use_cuda, loss_name=loss.name, main_program=main_program) + run_parallel_exe(main_program, pe, use_cuda, data, label, loss) + + def run_parallel_exe_with_fetch(self, main, pe, use_cuda, data, label, + loss): + def get_data(batch_size=8): + np.random.seed(5) + while True: + img = np.random.random( + size=[batch_size, 3, 224, 224]).astype(np.float32) + l = (np.random.random(size=[batch_size, 1]) * + 10).astype(np.int64) + yield img, l + + # TODO(zcd): I found that onece the memory optimizer is open, + # parallel_exe doesn't fetch some variable, such as conv2d_0.b_0@GRAD, + # conv2d_1.b_0@GRAD. Those variables should not be pruned. + # fluid.memory_optimize(main) + fetch_list = [] + all_vars = main.global_block().vars + + for k, v in all_vars.items(): + if ('tmp' not in k) and ( + k[0] is not '_' or v.persistable + ) and v.type == core.VarDesc.VarType.LOD_TENSOR: + fetch_list.append(k) + + for batch_id, img_label in enumerate(get_data()): + img, l = img_label + train_inputs = {data.name: img, label.name: l} + ret = pe.run(fetch_list, feed=train_inputs, return_numpy=True) + for i in range(len(fetch_list)): + assert not math.isnan(np.sum(ret[i])) and \ + not math.isinf(np.sum(ret[i])) + if batch_id == 2: + break + + def run_parallel_exe_with_feed(self, main, pe, use_cuda, data, label, loss): + def get_data(batch_size=8): + np.random.seed(5) + while True: + train_data = [] + for _ in range(batch_size): + img = np.random.random( + size=[1, 3, 224, 224]).astype(np.float32) + label = (np.random.random(size=[1, 1]) * + 10).astype(np.int64) + train_data.append([img, label]) + yield train_data + + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() + feeder = fluid.DataFeeder(place=place, feed_list=[data, label]) + reader = feeder.decorate_reader(get_data, multi_devices=True) for batch_id, data in enumerate(reader()): loss_np = pe.run(feed=data, fetch_list=[loss.name])[0] @@ -140,12 +119,22 @@ class TestFeedParallel(unittest.TestCase): if batch_id == 2: break - @unittest.skip(reason="CI timeout") - def test_feed_op(self): + def test_fetch(self): + os.environ['CPU_NUM'] = str(4) + if core.is_compiled_with_cuda(): + self.parallel_exe( + use_cuda=True, + run_parallel_exe=self.run_parallel_exe_with_fetch) + self.parallel_exe( + use_cuda=False, run_parallel_exe=self.run_parallel_exe_with_fetch) + + def test_feed(self): os.environ['CPU_NUM'] = str(4) if core.is_compiled_with_cuda(): - self.parallel_exe(use_cuda=True, seed=1) - self.parallel_exe(use_cuda=False, seed=1) + self.parallel_exe( + use_cuda=True, run_parallel_exe=self.run_parallel_exe_with_feed) + self.parallel_exe( + use_cuda=False, run_parallel_exe=self.run_parallel_exe_with_feed) if __name__ == '__main__':