From c7eb199b2e94292fc14b10cb75efd0b4bda47e56 Mon Sep 17 00:00:00 2001 From: Yang Yu Date: Wed, 10 Jan 2018 16:13:10 +0800 Subject: [PATCH] Init commit --- .../paddle/v2/fluid/tests/test_parallel_op.py | 108 +++++++++++------- 1 file changed, 68 insertions(+), 40 deletions(-) diff --git a/python/paddle/v2/fluid/tests/test_parallel_op.py b/python/paddle/v2/fluid/tests/test_parallel_op.py index 59ed041e7f..3736d5ea5a 100644 --- a/python/paddle/v2/fluid/tests/test_parallel_op.py +++ b/python/paddle/v2/fluid/tests/test_parallel_op.py @@ -1,45 +1,73 @@ import unittest - -import paddle.v2.fluid.layers as layers import paddle.v2.fluid as fluid -from paddle.v2.fluid.framework import Program -from paddle.v2.fluid.executor import Executor -from paddle.v2.fluid.backward import append_backward -import numpy as np -import paddle.v2.fluid.core as core - - -class ParallelOpTest(unittest.TestCase): - def setUp(self): - x = layers.data( - shape=[-1, 30, 40], - dtype='float32', - name='x', - append_batch_size=False, - stop_gradient=False) - - places = layers.get_places(device_count=4) - pd = layers.ParallelDo(places=places) - - with pd.do(): - data = pd.read_input(x) - hidden = layers.fc(input=data, size=7) - pd.write_output(hidden) - data = pd() - loss = layers.mean(x=data) - sgd_optimizer = fluid.optimizer.SGD(learning_rate=0.001) - sgd_optimizer.minimize(loss) - - exe = fluid.Executor(fluid.CPUPlace()) - exe.run(fluid.default_startup_program()) - exe.run(fluid.default_main_program(), - feed={ - x.name: np.random.uniform(0.1, 0.6, - (20, 30, 40)).astype("float32") - }) - - def test_forward(self): - pass +import numpy + + +class BaseParallelForTest(unittest.TestCase): + def main(self, callback, feed, fetch): + cpu = fluid.CPUPlace() + result_cpu = self._main_impl_( + callback=callback, + feed=feed, + fetch=fetch, + place=cpu, + use_parallel=False) + print result_cpu + + def _main_impl_(self, callback, feed, fetch, place, use_parallel=False): + main = fluid.Program() + startup = fluid.Program() + # Fix seed + main.random_seed = 10 + startup.random_seed = 10 + + with fluid.program_guard(main, startup): + generator = callback() + # Automatically insert parallel do if use_parallel = True + if use_parallel: + places = fluid.layers.get_places() + pd = fluid.layers.ParallelDo(places) + data = next(generator) + + if isinstance(data, fluid.Variable): + data = [data] + with pd.do(): + ins = map(pd.read_input, data) + if len(ins) == 1: + ins = ins[0] + generator.send(ins) # patch input + loss = next(generator) + pd.write_output(loss) + + loss = pd() + else: + data = next(generator) + generator.send(data) + loss = next(generator) + + avg_loss = fluid.layers.mean(x=loss) + fluid.backward.append_backward(loss=avg_loss) + + exe = fluid.Executor(place) + exe.run(startup) + return exe.run(main, feed=feed, fetch_list=fetch) + + +class ParallelOpTest(BaseParallelForTest): + def test_simple_fc(self): + def __network__(): + x = fluid.layers.data(shape=[784], dtype='float32', name='img') + x = yield x + hidden = fluid.layers.fc(input=x, size=200, param_attr='fc1.w') + loss = fluid.layers.mean(x=hidden) + yield loss + + self.main( + callback=__network__, + feed={ + 'img': numpy.random.random(size=(128, 784)).astype('float32') + }, + fetch='fc1.w@GRAD') if __name__ == '__main__': -- GitLab