test_parallel_op.py 7.8 KB
Newer Older
1
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
D
dzhwinter 已提交
2
#
D
dzhwinter 已提交
3 4 5
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
D
dzhwinter 已提交
6
#
D
dzhwinter 已提交
7
#     http://www.apache.org/licenses/LICENSE-2.0
D
dzhwinter 已提交
8
#
D
dzhwinter 已提交
9 10 11 12 13 14
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15 16
from __future__ import print_function

Y
Yang Yang 已提交
17
import unittest
18

19
import paddle.fluid as fluid
20
from paddle.fluid.layers.device import get_places
21
import paddle.fluid.profiler as profiler
Y
Yang Yu 已提交
22
import numpy
23
import six
Y
Yang Yu 已提交
24 25 26


class BaseParallelForTest(unittest.TestCase):
Y
Yang Yu 已提交
27 28 29 30
    def run_test(self, callback, feed, fetch):
        """
        Run the unittest for parallel.for
        Args:
31 32 33 34
            callback(callable): A callable function returns a generator. There
                are two yields in the generator function. The first yield
                returns the data layers, and the second yield returns the loss.
                The modified data variables will be sent back during the first
Y
Yang Yu 已提交
35
                yield.
36

Y
Yang Yu 已提交
37
            feed(dict): The executor feeding dictionary.
38
            fetch(list|basestr): The fetch name lists.
Y
Yang Yu 已提交
39 40 41

        Returns:
            None
42

Y
Yang Yu 已提交
43
        Raises:
44
            AssertionError when the computation of cpu, parallel.for in cpu,
Y
Yang Yu 已提交
45 46 47
                gpu, parallel.for in gpu are different.

        """
Y
Yang Yu 已提交
48
        cpu = fluid.CPUPlace()
Y
Yang Yu 已提交
49
        result_cpu = self._run_test_impl_(
Y
Yang Yu 已提交
50 51 52 53 54
            callback=callback,
            feed=feed,
            fetch=fetch,
            place=cpu,
            use_parallel=False)
Y
Yang Yu 已提交
55
        result_cpu_parallel = self._run_test_impl_(
Y
Yang Yu 已提交
56 57 58 59 60
            callback=callback,
            feed=feed,
            fetch=fetch,
            place=cpu,
            use_parallel=True)
61
        if fluid.core.is_compiled_with_cuda():
Y
Yang Yu 已提交
62
            gpu = fluid.CUDAPlace(0)
Y
Yang Yu 已提交
63
            result_gpu = self._run_test_impl_(
Y
Yang Yu 已提交
64 65 66 67
                callback=callback,
                feed=feed,
                fetch=fetch,
                place=gpu,
68 69
                use_parallel=False,
                use_gpu=True)
Y
Yang Yu 已提交
70
            result_gpu_parallel = self._run_test_impl_(
Y
Yang Yu 已提交
71 72 73 74
                callback=callback,
                feed=feed,
                fetch=fetch,
                place=gpu,
75 76
                use_parallel=True,
                use_gpu=True)
Y
Yang Yang 已提交
77 78 79 80 81 82
            result_gpu_nccl = self._run_test_impl_(
                callback=callback,
                feed=feed,
                fetch=fetch,
                place=gpu,
                use_parallel=True,
83 84
                use_nccl=True,
                use_gpu=True)
Y
Yang Yu 已提交
85
            self._assert_same_(fetch, result_cpu, result_cpu_parallel,
Y
Yang Yang 已提交
86
                               result_gpu, result_gpu_parallel, result_gpu_nccl)
Y
Yang Yu 已提交
87 88
        else:
            self._assert_same_(fetch, result_cpu, result_cpu_parallel)
Y
Yang Yu 已提交
89

Y
Yang Yang 已提交
90 91 92 93 94 95
    def _run_test_impl_(self,
                        callback,
                        feed,
                        fetch,
                        place,
                        use_parallel=False,
96 97
                        use_nccl=False,
                        use_gpu=False):
Y
Yang Yu 已提交
98 99 100
        """
        Run a single test, returns the fetch values
        Args:
101 102
            place(Place): the computation place.
            use_parallel(bool): Whether use parallel.for or not.
Y
Yang Yu 已提交
103 104 105 106 107

        Returns:
            Fetched numpy arrays.

        """
108
        if isinstance(fetch, six.string_types):
Y
Yang Yu 已提交
109
            fetch = [fetch]
Y
Yang Yu 已提交
110 111 112 113 114 115 116 117 118 119
        main = fluid.Program()
        startup = fluid.Program()
        # Fix seed
        main.random_seed = 10
        startup.random_seed = 10

        with fluid.program_guard(main, startup):
            generator = callback()
            # Automatically insert parallel do if use_parallel = True
            if use_parallel:
120 121
                thread_num = fluid.core.get_cuda_device_count(
                ) if use_gpu else 8
122
                places = get_places(thread_num)
Y
Yang Yang 已提交
123
                pd = fluid.layers.ParallelDo(places, use_nccl=use_nccl)
Y
Yang Yu 已提交
124 125
                data = next(generator)

W
Wu Yi 已提交
126
                if isinstance(data, fluid.framework.Variable):
Y
Yang Yu 已提交
127
                    data = [data]
Y
Yang Yu 已提交
128

Y
Yang Yu 已提交
129
                with pd.do():
130
                    ins = list(map(pd.read_input, data))
Y
Yang Yu 已提交
131 132
                    if len(ins) == 1:
                        ins = ins[0]
Y
Yang Yu 已提交
133
                    loss = generator.send(ins)  # patch input
Y
Yang Yu 已提交
134 135 136 137 138
                    pd.write_output(loss)

                loss = pd()
            else:
                data = next(generator)
Y
Yang Yu 已提交
139 140
                loss = generator.send(data)
            self.assertIsNotNone(loss)
Y
Yu Yang 已提交
141
            avg_loss = fluid.layers.mean(loss)
Y
Yang Yu 已提交
142 143 144 145
            fluid.backward.append_backward(loss=avg_loss)

        exe = fluid.Executor(place)
        exe.run(startup)
146 147 148 149 150 151
        if use_gpu:
            profile_type = 'GPU'
        else:
            profile_type = 'CPU'
        with profiler.profiler(profile_type, 'total', '/tmp/profiler'):
            return exe.run(main, feed=feed, fetch_list=fetch)
Y
Yang Yu 已提交
152

Y
Yang Yu 已提交
153
    def _assert_same_(self, fetch, *args):
Y
Yang Yu 已提交
154 155 156 157 158 159 160 161
        """
        Assert the return values of `run_test` are same.
        Args:
            fetch: Fetch list. Used for print error message
            *args: The fetch result lists of each situations.

        Returns:
            None
162

Y
Yang Yu 已提交
163 164 165 166 167
        Raises:
            AssertionError

        """

Y
Yang Yu 已提交
168
        def _impl_(a, b, fetch_id, item_id):
Y
Yang Yang 已提交
169 170 171
            item_str = [
                'CPU', 'ParallelCPU', 'GPU', 'ParallelGPU', 'ParallelGPUNCCL'
            ]
172 173 174 175
            flag = numpy.allclose(a, b, rtol=0.1, atol=1e-3)
            self.assertTrue(flag,
                            "The {0} are different in {1}, {2} vs {3}".format(
                                fetch[fetch_id], item_str[item_id], a, b))
Y
Yang Yu 已提交
176 177 178 179 180 181

        for i, items in enumerate(zip(*args)):
            self.assertGreater(len(items), 0)
            for j in range(1, len(items)):
                _impl_(items[0], items[j], fetch_id=i, item_id=j)

Y
Yang Yu 已提交
182 183

class ParallelOpTest(BaseParallelForTest):
Y
Yu Yang 已提交
184 185 186 187 188
    @staticmethod
    def __network__():
        x = fluid.layers.data(shape=[784], dtype='float32', name='img')
        x = yield x
        hidden = fluid.layers.fc(input=x, size=200, param_attr='fc1.w')
Y
Yu Yang 已提交
189
        hidden = fluid.layers.batch_norm(input=hidden)
Y
Yu Yang 已提交
190
        loss = fluid.layers.mean(hidden)
Y
Yu Yang 已提交
191
        yield loss
Y
Yang Yu 已提交
192

Y
Yang Yang 已提交
193
    def test_simple_fc(self):
Y
Yu Yang 已提交
194
        self.run_test(
Y
Yang Yang 已提交
195
            callback=self.__network__,
Y
Yang Yang 已提交
196 197 198
            feed={
                'img': numpy.random.random(size=(51, 784)).astype('float32')
            },
Y
Yang Yang 已提交
199
            fetch=['fc1.w@GRAD'])
Y
Yu Yang 已提交
200

Y
Yang Yang 已提交
201 202 203 204 205 206
    def test_fc_with_tiny_data(self):
        self.run_test(
            callback=self.__network__,
            feed={'img': numpy.random.random(size=(1, 784)).astype('float32')},
            fetch=['fc1.w@GRAD'])

Y
Yang Yang 已提交
207

Y
Yang Yang 已提交
208 209 210
class ParallelOpTestMultipleInput(BaseParallelForTest):
    @staticmethod
    def __network__():
Y
Yang Yu 已提交
211 212 213 214
        x = fluid.layers.data(
            shape=[784], dtype='float32', name='img1', stop_gradient=False)
        y = fluid.layers.data(
            shape=[784], dtype='float32', name='img2', stop_gradient=False)
Y
Yang Yang 已提交
215 216
        yield [x, y]
        x = x + y
Y
Yang Yang 已提交
217 218 219
        hidden1 = fluid.layers.fc(input=x, size=200, param_attr='fc1.w')
        hidden2 = fluid.layers.fc(input=hidden1, size=200, param_attr='fc2.w')
        hidden3 = fluid.layers.fc(input=hidden2, size=200, param_attr='fc3.w')
Y
Yu Yang 已提交
220
        loss = fluid.layers.mean(hidden3)
Y
Yang Yang 已提交
221 222 223 224 225 226 227 228 229
        yield loss

    def test_simple_fc(self):
        self.run_test(
            callback=self.__network__,
            feed={
                'img1': numpy.random.random(size=(51, 784)).astype('float32'),
                'img2': numpy.random.random(size=(51, 784)).astype('float32')
            },
Y
Yang Yang 已提交
230
            fetch=['fc1.w@GRAD', 'fc2.w@GRAD', 'fc3.w@GRAD'])
Y
Yang Yang 已提交
231 232


Y
Yang Yang 已提交
233 234
if __name__ == '__main__':
    unittest.main()