test_parallel_op.py 3.7 KB
Newer Older
Y
Yang Yang 已提交
1 2
import unittest
import paddle.v2.fluid as fluid
Y
Yang Yu 已提交
3 4 5 6 7 8 9 10 11 12 13 14
import numpy


class BaseParallelForTest(unittest.TestCase):
    def main(self, callback, feed, fetch):
        cpu = fluid.CPUPlace()
        result_cpu = self._main_impl_(
            callback=callback,
            feed=feed,
            fetch=fetch,
            place=cpu,
            use_parallel=False)
Y
Yang Yu 已提交
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
        result_cpu_parallel = self._main_impl_(
            callback=callback,
            feed=feed,
            fetch=fetch,
            place=cpu,
            use_parallel=True)
        if fluid.core.is_compile_gpu():
            gpu = fluid.CUDAPlace(0)
            result_gpu = self._main_impl_(
                callback=callback,
                feed=feed,
                fetch=fetch,
                place=gpu,
                use_parallel=False)
            result_gpu_parallel = self._main_impl_(
                callback=callback,
                feed=feed,
                fetch=fetch,
                place=gpu,
                use_parallel=True)
            self._assert_same_(fetch, result_cpu, result_cpu_parallel,
                               result_gpu, result_gpu_parallel)
        else:
            self._assert_same_(fetch, result_cpu, result_cpu_parallel)
Y
Yang Yu 已提交
39 40

    def _main_impl_(self, callback, feed, fetch, place, use_parallel=False):
Y
Yang Yu 已提交
41 42
        if isinstance(fetch, basestring):
            fetch = [fetch]
Y
Yang Yu 已提交
43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
        main = fluid.Program()
        startup = fluid.Program()
        # Fix seed
        main.random_seed = 10
        startup.random_seed = 10

        with fluid.program_guard(main, startup):
            generator = callback()
            # Automatically insert parallel do if use_parallel = True
            if use_parallel:
                places = fluid.layers.get_places()
                pd = fluid.layers.ParallelDo(places)
                data = next(generator)

                if isinstance(data, fluid.Variable):
                    data = [data]
Y
Yang Yu 已提交
59

Y
Yang Yu 已提交
60 61 62 63
                with pd.do():
                    ins = map(pd.read_input, data)
                    if len(ins) == 1:
                        ins = ins[0]
Y
Yang Yu 已提交
64
                    loss = generator.send(ins)  # patch input
Y
Yang Yu 已提交
65 66 67 68 69
                    pd.write_output(loss)

                loss = pd()
            else:
                data = next(generator)
Y
Yang Yu 已提交
70 71
                loss = generator.send(data)
            self.assertIsNotNone(loss)
Y
Yang Yu 已提交
72 73 74 75 76 77 78
            avg_loss = fluid.layers.mean(x=loss)
            fluid.backward.append_backward(loss=avg_loss)

        exe = fluid.Executor(place)
        exe.run(startup)
        return exe.run(main, feed=feed, fetch_list=fetch)

Y
Yang Yu 已提交
79 80 81 82 83 84 85 86 87 88 89 90
    def _assert_same_(self, fetch, *args):
        def _impl_(a, b, fetch_id, item_id):
            item_str = ['CPU', 'ParallelCPU', 'GPU', 'ParallelGPU']
            flag = numpy.allclose(a, b, rtol=0.1)
            self.assertTrue(flag, "The {0} are different in {1}".format(
                fetch[fetch_id], item_str[item_id]))

        for i, items in enumerate(zip(*args)):
            self.assertGreater(len(items), 0)
            for j in range(1, len(items)):
                _impl_(items[0], items[j], fetch_id=i, item_id=j)

Y
Yang Yu 已提交
91 92 93 94 95

class ParallelOpTest(BaseParallelForTest):
    def test_simple_fc(self):
        def __network__():
            x = fluid.layers.data(shape=[784], dtype='float32', name='img')
Y
Yang Yu 已提交
96 97
            # FIXME: This is a bug of parallel.do
            x.stop_gradient = False
Y
Yang Yu 已提交
98 99 100 101 102 103 104 105 106 107 108
            x = yield x
            hidden = fluid.layers.fc(input=x, size=200, param_attr='fc1.w')
            loss = fluid.layers.mean(x=hidden)
            yield loss

        self.main(
            callback=__network__,
            feed={
                'img': numpy.random.random(size=(128, 784)).astype('float32')
            },
            fetch='fc1.w@GRAD')
Y
Yang Yang 已提交
109 110 111 112


if __name__ == '__main__':
    unittest.main()