From 2412f2f4124250ef0a0b8eb66efbfa08900c5d74 Mon Sep 17 00:00:00 2001 From: Yang Yu Date: Wed, 10 Jan 2018 16:37:42 +0800 Subject: [PATCH] Polish Unittest for ParallelFor --- .../paddle/v2/fluid/tests/test_parallel_op.py | 50 ++++++++++++++++--- 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/python/paddle/v2/fluid/tests/test_parallel_op.py b/python/paddle/v2/fluid/tests/test_parallel_op.py index 3736d5ea5a5..049ae0fe28d 100644 --- a/python/paddle/v2/fluid/tests/test_parallel_op.py +++ b/python/paddle/v2/fluid/tests/test_parallel_op.py @@ -12,9 +12,34 @@ class BaseParallelForTest(unittest.TestCase): fetch=fetch, place=cpu, use_parallel=False) - print result_cpu + result_cpu_parallel = self._main_impl_( + callback=callback, + feed=feed, + fetch=fetch, + place=cpu, + use_parallel=True) + if fluid.core.is_compile_gpu(): + gpu = fluid.CUDAPlace(0) + result_gpu = self._main_impl_( + callback=callback, + feed=feed, + fetch=fetch, + place=gpu, + use_parallel=False) + result_gpu_parallel = self._main_impl_( + callback=callback, + feed=feed, + fetch=fetch, + place=gpu, + use_parallel=True) + self._assert_same_(fetch, result_cpu, result_cpu_parallel, + result_gpu, result_gpu_parallel) + else: + self._assert_same_(fetch, result_cpu, result_cpu_parallel) def _main_impl_(self, callback, feed, fetch, place, use_parallel=False): + if isinstance(fetch, basestring): + fetch = [fetch] main = fluid.Program() startup = fluid.Program() # Fix seed @@ -31,20 +56,19 @@ class BaseParallelForTest(unittest.TestCase): if isinstance(data, fluid.Variable): data = [data] + with pd.do(): ins = map(pd.read_input, data) if len(ins) == 1: ins = ins[0] - generator.send(ins) # patch input - loss = next(generator) + loss = generator.send(ins) # patch input pd.write_output(loss) loss = pd() else: data = next(generator) - generator.send(data) - loss = next(generator) - + loss = generator.send(data) + self.assertIsNotNone(loss) avg_loss = fluid.layers.mean(x=loss) fluid.backward.append_backward(loss=avg_loss) @@ -52,11 +76,25 @@ class BaseParallelForTest(unittest.TestCase): exe.run(startup) return exe.run(main, feed=feed, fetch_list=fetch) + def _assert_same_(self, fetch, *args): + def _impl_(a, b, fetch_id, item_id): + item_str = ['CPU', 'ParallelCPU', 'GPU', 'ParallelGPU'] + flag = numpy.allclose(a, b, rtol=0.1) + self.assertTrue(flag, "The {0} are different in {1}".format( + fetch[fetch_id], item_str[item_id])) + + for i, items in enumerate(zip(*args)): + self.assertGreater(len(items), 0) + for j in range(1, len(items)): + _impl_(items[0], items[j], fetch_id=i, item_id=j) + class ParallelOpTest(BaseParallelForTest): def test_simple_fc(self): def __network__(): x = fluid.layers.data(shape=[784], dtype='float32', name='img') + # FIXME: This is a bug of parallel.do + x.stop_gradient = False x = yield x hidden = fluid.layers.fc(input=x, size=200, param_attr='fc1.w') loss = fluid.layers.mean(x=hidden) -- GitLab