test_decoupled_py_reader.py 6.9 KB
Newer Older
S
sneaxiy 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import time
import unittest

18 19 20 21 22
import numpy as np

import paddle
import paddle.fluid as fluid

23 24 25
EPOCH_NUM = 5
BATCH_SIZE = 16
BATCH_NUM = 10
S
sneaxiy 已提交
26 27 28 29
CLASS_NUM = 10


def random_reader():
S
sneaxiy 已提交
30
    np.random.seed(1)
31
    for i in range(BATCH_SIZE * BATCH_NUM):
S
sneaxiy 已提交
32
        image = np.random.random([784])
33
        label = np.random.randint(low=0, high=CLASS_NUM)
S
sneaxiy 已提交
34 35 36
        yield image, label


S
sneaxiy 已提交
37
def simple_fc_net(places, use_legacy_py_reader, use_double_buffer):
C
cnn 已提交
38
    paddle.seed(1)
L
Leo Chen 已提交
39
    paddle.framework.random._manual_program_seed(1)
S
sneaxiy 已提交
40 41 42 43 44
    startup_prog = fluid.Program()
    main_prog = fluid.Program()

    with fluid.unique_name.guard():
        with fluid.program_guard(main_prog, startup_prog):
G
GGBond8488 已提交
45 46 47 48 49
            image = paddle.static.data(
                name='image', shape=[-1, 784], dtype='float32'
            )
            label = paddle.static.data(
                name='label', shape=[-1, 1], dtype='int64'
50 51 52 53 54 55 56
            )
            py_reader = fluid.io.PyReader(
                feed_list=[image, label],
                capacity=4,
                iterable=not use_legacy_py_reader,
                use_double_buffer=use_double_buffer,
            )
S
sneaxiy 已提交
57 58
            hidden = image
            for hidden_size in [10, 20, 30]:
C
Charles-hit 已提交
59
                hidden = paddle.static.nn.fc(
S
sneaxiy 已提交
60 61
                    hidden,
                    size=hidden_size,
C
Charles-hit 已提交
62
                    activation='tanh',
S
sneaxiy 已提交
63
                    bias_attr=fluid.ParamAttr(
64 65 66
                        initializer=fluid.initializer.Constant(value=1.0)
                    ),
                )
S
sneaxiy 已提交
67

C
Charles-hit 已提交
68 69
            predict_label = paddle.static.nn.fc(
                hidden, size=CLASS_NUM, activation='softmax'
70
            )
71
            loss = paddle.mean(
72 73 74 75 76 77
                paddle.nn.functional.cross_entropy(
                    input=predict_label,
                    label=label,
                    reduction='none',
                    use_softmax=False,
                )
78
            )
S
sneaxiy 已提交
79 80 81 82 83 84 85

            optimizer = fluid.optimizer.Adam()
            optimizer.minimize(loss)
    return startup_prog, main_prog, py_reader, loss


class TestBase(unittest.TestCase):
86 87 88 89 90 91 92
    def run_main(
        self,
        use_legacy_py_reader,
        with_data_parallel,
        places,
        use_double_buffer,
    ):
S
sneaxiy 已提交
93 94
        scope = fluid.Scope()
        with fluid.scope_guard(scope):
S
sneaxiy 已提交
95
            startup_prog, main_prog, py_reader, loss = simple_fc_net(
96 97
                places, use_legacy_py_reader, use_double_buffer
            )
S
sneaxiy 已提交
98 99 100 101

            reader = paddle.batch(random_reader, batch_size=BATCH_SIZE)

            ps = places if use_double_buffer else fluid.cpu_places(len(places))
S
sneaxiy 已提交
102 103

            py_reader.decorate_sample_list_generator(
104 105
                reader, places=ps if py_reader.iterable else None
            )
S
sneaxiy 已提交
106

S
sneaxiy 已提交
107 108 109 110 111
            exe = fluid.Executor(place=places[0])
            exe.run(startup_prog)

            prog = fluid.CompiledProgram(main_prog)
            if with_data_parallel:
112 113 114
                prog = prog.with_data_parallel(
                    loss_name=loss.name, places=places
                )
S
sneaxiy 已提交
115 116

            step = 0
S
sneaxiy 已提交
117
            step_list = []
S
sneaxiy 已提交
118
            loss_list = []
S
sneaxiy 已提交
119
            start_t = time.time()
S
sneaxiy 已提交
120
            if not py_reader.iterable:
121
                for _ in range(EPOCH_NUM):
S
sneaxiy 已提交
122
                    step = 0
S
sneaxiy 已提交
123 124 125
                    py_reader.start()
                    while True:
                        try:
126 127 128 129 130
                            (L,) = exe.run(
                                program=prog,
                                fetch_list=[loss],
                                use_program_cache=True,
                            )
S
sneaxiy 已提交
131
                            loss_list.append(np.mean(L))
S
sneaxiy 已提交
132 133 134 135
                            step += 1
                        except fluid.core.EOFException:
                            py_reader.reset()
                            break
S
sneaxiy 已提交
136
                    step_list.append(step)
S
sneaxiy 已提交
137
            else:
138
                for _ in range(EPOCH_NUM):
S
sneaxiy 已提交
139
                    step = 0
S
sneaxiy 已提交
140 141 142 143 144 145 146
                    for d in py_reader():
                        assert len(d) == len(places)
                        for i, item in enumerate(d):
                            image = item['image']
                            label = item['label']
                            assert image.shape() == [BATCH_SIZE, 784]
                            assert label.shape() == [BATCH_SIZE, 1]
147 148
                            assert image._place()._equals(ps[i])
                            assert label._place()._equals(ps[i])
149 150 151 152 153 154
                        (L,) = exe.run(
                            program=prog,
                            feed=d,
                            fetch_list=[loss],
                            use_program_cache=True,
                        )
S
sneaxiy 已提交
155
                        loss_list.append(np.mean(L))
S
sneaxiy 已提交
156
                        step += 1
S
sneaxiy 已提交
157
                    step_list.append(step)
S
sneaxiy 已提交
158
            end_t = time.time()
S
sneaxiy 已提交
159 160 161
            ret = {
                "time": end_t - start_t,
                "step": step_list,
162
                "loss": np.array(loss_list),
S
sneaxiy 已提交
163
            }
S
sneaxiy 已提交
164 165
            return ret

S
sneaxiy 已提交
166
    def prepare_places(self, with_data_parallel, with_cpu=True, with_gpu=True):
S
sneaxiy 已提交
167 168 169 170 171
        places = []
        if with_cpu:
            places.append([fluid.CPUPlace()])
            if with_data_parallel:
                places.append([fluid.CPUPlace()] * 2)
S
sneaxiy 已提交
172

S
sneaxiy 已提交
173
        if with_gpu and fluid.core.is_compiled_with_cuda():
S
sneaxiy 已提交
174 175 176 177 178 179 180 181 182 183
            tmp = fluid.cuda_places()
            assert len(tmp) > 0, "no gpu detected"
            if with_data_parallel:
                places.append(tmp)
            places.append([tmp[0]])
        return places

    def test_main(self):
        for with_data_parallel in [True, False]:
            for p in self.prepare_places(with_data_parallel):
S
sneaxiy 已提交
184
                for use_double_buffer in [False, True]:
S
sneaxiy 已提交
185
                    results = []
S
sneaxiy 已提交
186 187 188 189 190
                    for use_legacy_py_reader in [False, True]:
                        ret = self.run_main(
                            use_legacy_py_reader=use_legacy_py_reader,
                            with_data_parallel=with_data_parallel,
                            places=p,
191 192
                            use_double_buffer=use_double_buffer,
                        )
S
sneaxiy 已提交
193 194 195
                        results.append(ret)
                    if not use_double_buffer:
                        diff = np.max(
196 197
                            np.abs(results[0]['loss'] - results[1]['loss'])
                        )
S
sneaxiy 已提交
198
                        self.assertLess(diff, 1e-3)
S
sneaxiy 已提交
199 200 201 202


if __name__ == '__main__':
    unittest.main()