test_parallel_executor_mnist.py 9.0 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from parallel_executor_test_base import TestParallelExecutorBase
import paddle.fluid as fluid
17
import paddle.fluid.core as core
18 19 20 21
import numpy as np
import paddle
import paddle.dataset.mnist as mnist
import unittest
C
chengduoZH 已提交
22
import os
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89

MNIST_RECORDIO_FILE = "./mnist_test_pe.recordio"


def simple_fc_net(use_feed):
    if use_feed:
        img = fluid.layers.data(name='image', shape=[784], dtype='float32')
        label = fluid.layers.data(name='label', shape=[1], dtype='int64')
    else:
        reader = fluid.layers.open_files(
            filenames=[MNIST_RECORDIO_FILE],
            shapes=[[-1, 784], [-1, 1]],
            lod_levels=[0, 0],
            dtypes=['float32', 'int64'],
            thread_num=1,
            for_parallel=True)
        reader = fluid.layers.io.double_buffer(reader)
        img, label = fluid.layers.read_file(reader)
    hidden = img
    for _ in xrange(4):
        hidden = fluid.layers.fc(
            hidden,
            size=200,
            act='tanh',
            bias_attr=fluid.ParamAttr(
                initializer=fluid.initializer.Constant(value=1.0)))
    prediction = fluid.layers.fc(hidden, size=10, act='softmax')
    loss = fluid.layers.cross_entropy(input=prediction, label=label)
    loss = fluid.layers.mean(loss)
    return loss


def fc_with_batchnorm(use_feed):
    if use_feed:
        img = fluid.layers.data(name='image', shape=[784], dtype='float32')
        label = fluid.layers.data(name='label', shape=[1], dtype='int64')
    else:
        reader = fluid.layers.open_files(
            filenames=[MNIST_RECORDIO_FILE],
            shapes=[[-1, 784], [-1, 1]],
            lod_levels=[0, 0],
            dtypes=['float32', 'int64'],
            thread_num=1,
            for_parallel=True)
        reader = fluid.layers.io.double_buffer(reader)
        img, label = fluid.layers.read_file(reader)

    hidden = img
    for _ in xrange(1):
        hidden = fluid.layers.fc(
            hidden,
            size=200,
            act='tanh',
            bias_attr=fluid.ParamAttr(
                initializer=fluid.initializer.Constant(value=1.0)))

        hidden = fluid.layers.batch_norm(input=hidden)

    prediction = fluid.layers.fc(hidden, size=10, act='softmax')
    loss = fluid.layers.cross_entropy(input=prediction, label=label)
    loss = fluid.layers.mean(loss)
    return loss


class TestMNIST(TestParallelExecutorBase):
    @classmethod
    def setUpClass(cls):
C
chengduoZH 已提交
90
        os.environ['CPU_NUM'] = str(4)
91 92 93 94 95 96 97 98 99 100 101 102 103 104
        # Convert mnist to recordio file
        with fluid.program_guard(fluid.Program(), fluid.Program()):
            reader = paddle.batch(mnist.train(), batch_size=4)
            feeder = fluid.DataFeeder(
                feed_list=[  # order is image and label
                    fluid.layers.data(
                        name='image', shape=[784]),
                    fluid.layers.data(
                        name='label', shape=[1], dtype='int64'),
                ],
                place=fluid.CPUPlace())
            fluid.recordio_writer.convert_reader_to_recordio_file(
                MNIST_RECORDIO_FILE, reader, feeder)

C
chengduo 已提交
105 106 107 108 109 110 111 112 113 114
    def _init_data(self, random=True):
        np.random.seed(5)
        if random:
            img = np.random.random(size=[32, 784]).astype(np.float32)
        else:
            img = np.ones(shape=[32, 784], dtype='float32')
        label = np.ones(shape=[32, 1], dtype='int64')
        return img, label

    # simple_fc
115
    def check_simple_fc_convergence(self, use_cuda, use_reduce=False):
116 117
        if use_cuda and not core.is_compiled_with_cuda():
            return
C
chengduoZH 已提交
118 119 120
        self.check_network_convergence(simple_fc_net, use_cuda=use_cuda)
        self.check_network_convergence(
            simple_fc_net, use_cuda=use_cuda, allow_op_delay=True)
121

C
chengduo 已提交
122 123
        img, label = self._init_data()

124 125 126 127
        self.check_network_convergence(
            simple_fc_net,
            feed_dict={"image": img,
                       "label": label},
128
            use_cuda=use_cuda,
129
            use_reduce=use_reduce)
130

C
chengduo 已提交
131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161
    def check_simple_fc_convergence_with_Reduce(self, use_cuda):
        if use_cuda and not core.is_compiled_with_cuda():
            return
        self.check_network_convergence(
            simple_fc_net, use_cuda=use_cuda, use_reduce=True)
        self.check_network_convergence(
            simple_fc_net,
            use_cuda=use_cuda,
            allow_op_delay=True,
            use_reduce=True)

        img, label = self._init_data()

        all_reduce_first_loss, all_reduce_last_loss = self.check_network_convergence(
            simple_fc_net,
            feed_dict={"image": img,
                       "label": label},
            use_cuda=use_cuda,
            use_reduce=False)
        reduce_first_loss, reduce_last_loss = self.check_network_convergence(
            simple_fc_net,
            feed_dict={"image": img,
                       "label": label},
            use_cuda=use_cuda,
            use_reduce=True)

        for loss in zip(all_reduce_first_loss, reduce_first_loss):
            self.assertAlmostEquals(loss[0], loss[1], delta=1e-6)
        for loss in zip(all_reduce_last_loss, reduce_last_loss):
            self.assertAlmostEquals(loss[0], loss[1], delta=1e-6)

162
    def test_simple_fc(self):
163 164 165
        # use_cuda
        self.check_simple_fc_convergence(True)
        self.check_simple_fc_convergence(False)
166 167

    def test_simple_fc_with_new_strategy(self):
168
        # use_cuda, use_reduce
C
chengduo 已提交
169 170
        self.check_simple_fc_convergence_with_Reduce(True)
        self.check_simple_fc_convergence_with_Reduce(False)
171

C
chengduo 已提交
172
    def check_simple_fc_parallel_accuracy(self, use_cuda):
173 174
        if use_cuda and not core.is_compiled_with_cuda():
            return
C
chengduo 已提交
175 176 177

        img, label = self._init_data(random=False)

178 179 180 181 182
        single_first_loss, single_last_loss = self.check_network_convergence(
            method=simple_fc_net,
            seed=1000,
            feed_dict={"image": img,
                       "label": label},
183
            use_cuda=use_cuda,
184 185 186 187 188 189
            use_parallel_executor=False)
        parallel_first_loss, parallel_last_loss = self.check_network_convergence(
            method=simple_fc_net,
            seed=1000,
            feed_dict={"image": img,
                       "label": label},
C
chengduoZH 已提交
190
            use_cuda=use_cuda,
C
chengduo 已提交
191
            use_parallel_executor=True)
192 193 194 195 196 197 198

        for p_f in parallel_first_loss:
            self.assertAlmostEquals(p_f, single_first_loss[0], delta=1e-6)
        for p_l in parallel_last_loss:
            self.assertAlmostEquals(p_l, single_last_loss[0], delta=1e-6)

    def test_simple_fc_parallel_accuracy(self):
199 200
        self.check_simple_fc_parallel_accuracy(True)
        self.check_simple_fc_parallel_accuracy(False)
201

C
chengduo 已提交
202
    def check_batchnorm_fc_convergence(self, use_cuda):
203 204
        if use_cuda and not core.is_compiled_with_cuda():
            return
C
chengduo 已提交
205

C
chengduoZH 已提交
206
        self.check_network_convergence(fc_with_batchnorm, use_cuda=use_cuda)
C
chengduo 已提交
207 208 209 210 211 212 213 214 215 216 217 218

        img, label = self._init_data()

        self.check_network_convergence(
            fc_with_batchnorm,
            feed_dict={"image": img,
                       "label": label},
            use_cuda=use_cuda)

    def check_batchnorm_fc_convergence_use_reduce(self, use_cuda):
        if use_cuda and not core.is_compiled_with_cuda():
            return
219
        self.check_network_convergence(
C
chengduo 已提交
220 221 222 223 224
            fc_with_batchnorm, use_cuda=use_cuda, use_reduce=True)

        img, label = self._init_data()

        all_reduce_first_loss, all_reduce_last_loss = self.check_network_convergence(
225 226 227
            fc_with_batchnorm,
            feed_dict={"image": img,
                       "label": label},
228
            use_cuda=use_cuda,
C
chengduo 已提交
229 230 231 232 233 234 235 236 237 238 239 240
            use_reduce=False)
        reduce_first_loss, reduce_last_loss = self.check_network_convergence(
            fc_with_batchnorm,
            feed_dict={"image": img,
                       "label": label},
            use_cuda=use_cuda,
            use_reduce=True)

        for loss in zip(all_reduce_first_loss, reduce_first_loss):
            self.assertAlmostEquals(loss[0], loss[1], delta=1e-6)
        for loss in zip(all_reduce_last_loss, reduce_last_loss):
            self.assertAlmostEquals(loss[0], loss[1], delta=1e-4)
241 242

    def test_batchnorm_fc(self):
243 244
        self.check_batchnorm_fc_convergence(True)
        self.check_batchnorm_fc_convergence(False)
245 246

    def test_batchnorm_fc_with_new_strategy(self):
C
chengduo 已提交
247 248
        self.check_batchnorm_fc_convergence_use_reduce(True)
        self.check_batchnorm_fc_convergence_use_reduce(False)
249 250 251 252


if __name__ == '__main__':
    unittest.main()