test_parallel_op.py 7.9 KB
Newer Older
1
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
D
dzhwinter 已提交
2
#
D
dzhwinter 已提交
3 4 5
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
D
dzhwinter 已提交
6
#
D
dzhwinter 已提交
7
#     http://www.apache.org/licenses/LICENSE-2.0
D
dzhwinter 已提交
8
#
D
dzhwinter 已提交
9 10 11 12 13 14
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15 16
from __future__ import print_function

Y
Yang Yang 已提交
17
import unittest
18

19
import paddle.fluid as fluid
20
from paddle.fluid.layers.device import get_places
Q
qingqing01 已提交
21
from paddle.fluid.layers.control_flow import ParallelDo
22
import paddle.fluid.profiler as profiler
Y
Yang Yu 已提交
23
import numpy
24
import six
Y
Yang Yu 已提交
25 26 27


class BaseParallelForTest(unittest.TestCase):
Y
Yang Yu 已提交
28 29 30 31
    def run_test(self, callback, feed, fetch):
        """
        Run the unittest for parallel.for
        Args:
32 33 34 35
            callback(callable): A callable function returns a generator. There
                are two yields in the generator function. The first yield
                returns the data layers, and the second yield returns the loss.
                The modified data variables will be sent back during the first
Y
Yang Yu 已提交
36
                yield.
37

Y
Yang Yu 已提交
38
            feed(dict): The executor feeding dictionary.
39
            fetch(list|basestr): The fetch name lists.
Y
Yang Yu 已提交
40 41 42

        Returns:
            None
43

Y
Yang Yu 已提交
44
        Raises:
45
            AssertionError when the computation of cpu, parallel.for in cpu,
Y
Yang Yu 已提交
46 47 48
                gpu, parallel.for in gpu are different.

        """
Y
Yang Yu 已提交
49
        cpu = fluid.CPUPlace()
Y
Yang Yu 已提交
50
        result_cpu = self._run_test_impl_(
Y
Yang Yu 已提交
51 52 53 54 55
            callback=callback,
            feed=feed,
            fetch=fetch,
            place=cpu,
            use_parallel=False)
Y
Yang Yu 已提交
56
        result_cpu_parallel = self._run_test_impl_(
Y
Yang Yu 已提交
57 58 59 60 61
            callback=callback,
            feed=feed,
            fetch=fetch,
            place=cpu,
            use_parallel=True)
62
        if fluid.core.is_compiled_with_cuda():
Y
Yang Yu 已提交
63
            gpu = fluid.CUDAPlace(0)
Y
Yang Yu 已提交
64
            result_gpu = self._run_test_impl_(
Y
Yang Yu 已提交
65 66 67 68
                callback=callback,
                feed=feed,
                fetch=fetch,
                place=gpu,
69 70
                use_parallel=False,
                use_gpu=True)
Y
Yang Yu 已提交
71
            result_gpu_parallel = self._run_test_impl_(
Y
Yang Yu 已提交
72 73 74 75
                callback=callback,
                feed=feed,
                fetch=fetch,
                place=gpu,
76 77
                use_parallel=True,
                use_gpu=True)
Y
Yang Yang 已提交
78 79 80 81 82 83
            result_gpu_nccl = self._run_test_impl_(
                callback=callback,
                feed=feed,
                fetch=fetch,
                place=gpu,
                use_parallel=True,
84 85
                use_nccl=True,
                use_gpu=True)
Y
Yang Yu 已提交
86
            self._assert_same_(fetch, result_cpu, result_cpu_parallel,
Y
Yang Yang 已提交
87
                               result_gpu, result_gpu_parallel, result_gpu_nccl)
Y
Yang Yu 已提交
88 89
        else:
            self._assert_same_(fetch, result_cpu, result_cpu_parallel)
Y
Yang Yu 已提交
90

Y
Yang Yang 已提交
91 92 93 94 95 96
    def _run_test_impl_(self,
                        callback,
                        feed,
                        fetch,
                        place,
                        use_parallel=False,
97 98
                        use_nccl=False,
                        use_gpu=False):
Y
Yang Yu 已提交
99 100 101
        """
        Run a single test, returns the fetch values
        Args:
102 103
            place(Place): the computation place.
            use_parallel(bool): Whether use parallel.for or not.
Y
Yang Yu 已提交
104 105 106 107 108

        Returns:
            Fetched numpy arrays.

        """
109
        if isinstance(fetch, six.string_types):
Y
Yang Yu 已提交
110
            fetch = [fetch]
Y
Yang Yu 已提交
111 112 113 114 115 116 117 118 119 120
        main = fluid.Program()
        startup = fluid.Program()
        # Fix seed
        main.random_seed = 10
        startup.random_seed = 10

        with fluid.program_guard(main, startup):
            generator = callback()
            # Automatically insert parallel do if use_parallel = True
            if use_parallel:
121 122
                thread_num = fluid.core.get_cuda_device_count(
                ) if use_gpu else 8
123
                places = get_places(thread_num)
Q
qingqing01 已提交
124
                pd = ParallelDo(places, use_nccl=use_nccl)
Y
Yang Yu 已提交
125 126
                data = next(generator)

W
Wu Yi 已提交
127
                if isinstance(data, fluid.framework.Variable):
Y
Yang Yu 已提交
128
                    data = [data]
Y
Yang Yu 已提交
129

Y
Yang Yu 已提交
130
                with pd.do():
131
                    ins = list(map(pd.read_input, data))
Y
Yang Yu 已提交
132 133
                    if len(ins) == 1:
                        ins = ins[0]
Y
Yang Yu 已提交
134
                    loss = generator.send(ins)  # patch input
Y
Yang Yu 已提交
135 136 137 138 139
                    pd.write_output(loss)

                loss = pd()
            else:
                data = next(generator)
Y
Yang Yu 已提交
140 141
                loss = generator.send(data)
            self.assertIsNotNone(loss)
Y
Yu Yang 已提交
142
            avg_loss = fluid.layers.mean(loss)
Y
Yang Yu 已提交
143 144 145 146
            fluid.backward.append_backward(loss=avg_loss)

        exe = fluid.Executor(place)
        exe.run(startup)
147 148 149 150 151 152
        if use_gpu:
            profile_type = 'GPU'
        else:
            profile_type = 'CPU'
        with profiler.profiler(profile_type, 'total', '/tmp/profiler'):
            return exe.run(main, feed=feed, fetch_list=fetch)
Y
Yang Yu 已提交
153

Y
Yang Yu 已提交
154
    def _assert_same_(self, fetch, *args):
Y
Yang Yu 已提交
155 156 157 158 159 160 161 162
        """
        Assert the return values of `run_test` are same.
        Args:
            fetch: Fetch list. Used for print error message
            *args: The fetch result lists of each situations.

        Returns:
            None
163

Y
Yang Yu 已提交
164 165 166 167 168
        Raises:
            AssertionError

        """

Y
Yang Yu 已提交
169
        def _impl_(a, b, fetch_id, item_id):
Y
Yang Yang 已提交
170 171 172
            item_str = [
                'CPU', 'ParallelCPU', 'GPU', 'ParallelGPU', 'ParallelGPUNCCL'
            ]
173 174 175 176
            flag = numpy.allclose(a, b, rtol=0.1, atol=1e-3)
            self.assertTrue(flag,
                            "The {0} are different in {1}, {2} vs {3}".format(
                                fetch[fetch_id], item_str[item_id], a, b))
Y
Yang Yu 已提交
177 178 179 180 181 182

        for i, items in enumerate(zip(*args)):
            self.assertGreater(len(items), 0)
            for j in range(1, len(items)):
                _impl_(items[0], items[j], fetch_id=i, item_id=j)

Y
Yang Yu 已提交
183 184

class ParallelOpTest(BaseParallelForTest):
Y
Yu Yang 已提交
185 186 187 188 189
    @staticmethod
    def __network__():
        x = fluid.layers.data(shape=[784], dtype='float32', name='img')
        x = yield x
        hidden = fluid.layers.fc(input=x, size=200, param_attr='fc1.w')
Y
Yu Yang 已提交
190
        hidden = fluid.layers.batch_norm(input=hidden)
Y
Yu Yang 已提交
191
        loss = fluid.layers.mean(hidden)
Y
Yu Yang 已提交
192
        yield loss
Y
Yang Yu 已提交
193

Y
Yang Yang 已提交
194
    def test_simple_fc(self):
Y
Yu Yang 已提交
195
        self.run_test(
Y
Yang Yang 已提交
196
            callback=self.__network__,
Y
Yang Yang 已提交
197 198 199
            feed={
                'img': numpy.random.random(size=(51, 784)).astype('float32')
            },
Y
Yang Yang 已提交
200
            fetch=['fc1.w@GRAD'])
Y
Yu Yang 已提交
201

Y
Yang Yang 已提交
202 203 204 205 206 207
    def test_fc_with_tiny_data(self):
        self.run_test(
            callback=self.__network__,
            feed={'img': numpy.random.random(size=(1, 784)).astype('float32')},
            fetch=['fc1.w@GRAD'])

Y
Yang Yang 已提交
208

Y
Yang Yang 已提交
209 210 211
class ParallelOpTestMultipleInput(BaseParallelForTest):
    @staticmethod
    def __network__():
Y
Yang Yu 已提交
212 213 214 215
        x = fluid.layers.data(
            shape=[784], dtype='float32', name='img1', stop_gradient=False)
        y = fluid.layers.data(
            shape=[784], dtype='float32', name='img2', stop_gradient=False)
Y
Yang Yang 已提交
216 217
        yield [x, y]
        x = x + y
Y
Yang Yang 已提交
218 219 220
        hidden1 = fluid.layers.fc(input=x, size=200, param_attr='fc1.w')
        hidden2 = fluid.layers.fc(input=hidden1, size=200, param_attr='fc2.w')
        hidden3 = fluid.layers.fc(input=hidden2, size=200, param_attr='fc3.w')
Y
Yu Yang 已提交
221
        loss = fluid.layers.mean(hidden3)
Y
Yang Yang 已提交
222 223 224 225 226 227 228 229 230
        yield loss

    def test_simple_fc(self):
        self.run_test(
            callback=self.__network__,
            feed={
                'img1': numpy.random.random(size=(51, 784)).astype('float32'),
                'img2': numpy.random.random(size=(51, 784)).astype('float32')
            },
Y
Yang Yang 已提交
231
            fetch=['fc1.w@GRAD', 'fc2.w@GRAD', 'fc3.w@GRAD'])
Y
Yang Yang 已提交
232 233


Y
Yang Yang 已提交
234 235
if __name__ == '__main__':
    unittest.main()