test_data_balance.py 7.9 KB
Newer Older
F
fengjiayi 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import unittest
import paddle.fluid as fluid
import paddle.v2 as paddle
import numpy as np


class TestDataBalance(unittest.TestCase):
    def prepare_data(self):
        def fake_data_generator():
            for n in xrange(self.total_ins_num):
                yield np.ones((3, 4)) * n, n

        # Prepare data
        with fluid.program_guard(fluid.Program(), fluid.Program()):
            reader = paddle.batch(
                fake_data_generator, batch_size=self.batch_size)
            feeder = fluid.DataFeeder(
                feed_list=[
                    fluid.layers.data(
                        name='image', shape=[3, 4], dtype='float32'),
                    fluid.layers.data(
                        name='label', shape=[1], dtype='int64'),
                ],
                place=fluid.CPUPlace())
            self.num_batches = fluid.recordio_writer.convert_reader_to_recordio_file(
                self.data_file_name, reader, feeder)

    def prepare_lod_data(self):
        def fake_data_generator():
            for n in xrange(1, self.total_ins_num + 1):
                d1 = (np.ones((n, 3)) * n).astype('float32')
                d2 = (np.array(n).reshape((1, 1))).astype('int32')
                yield d1, d2

        # Prepare lod data
        with fluid.program_guard(fluid.Program(), fluid.Program()):
            with fluid.recordio_writer.create_recordio_writer(
                    filename=self.lod_data_file_name) as writer:
                eof = False
                generator = fake_data_generator()
                while (not eof):
                    data_batch = [
                        np.array([]).reshape((0, 3)), np.array([]).reshape(
                            (0, 1))
                    ]
                    lod = [0]
                    for _ in xrange(self.batch_size):
                        try:
                            ins = generator.next()
                        except StopIteration:
                            eof = True
                            break
                        for i, d in enumerate(ins):
                            data_batch[i] = np.concatenate(
                                (data_batch[i], d), axis=0)
                        lod.append(lod[-1] + ins[0].shape[0])
                    if data_batch[0].shape[0] > 0:
                        for i, d in enumerate(data_batch):
                            t = fluid.LoDTensor()
                            t.set(data_batch[i], fluid.CPUPlace())
                            if i == 0:
                                t.set_lod([lod])
                            writer.append_tensor(t)
                        writer.complete_append_tensor()

    def setUp(self):
        self.use_cuda = fluid.core.is_compiled_with_cuda()
        self.data_file_name = './data_balance_test.recordio'
        self.lod_data_file_name = './data_balance_with_lod_test.recordio'
        self.total_ins_num = 50
        self.batch_size = 10
        self.prepare_data()
        self.prepare_lod_data()

    def main(self):
        main_prog = fluid.Program()
        startup_prog = fluid.Program()
        with fluid.program_guard(main_prog, startup_prog):
            data_reader = fluid.layers.io.open_files(
                filenames=[self.data_file_name],
                shapes=[[-1, 3, 4], [-1, 1]],
                lod_levels=[0, 0],
                dtypes=['float32', 'int64'])
            if self.use_cuda:
                data_reader = fluid.layers.double_buffer(data_reader)
            image, label = fluid.layers.read_file(data_reader)

            place = fluid.CUDAPlace(0) if self.use_cuda else fluid.CPUPlace()
            exe = fluid.Executor(place)
            exe.run(startup_prog)

Y
yuyang18 已提交
106 107
            build_strategy = fluid.BuildStrategy()
            build_strategy.enable_data_balance = True
F
fengjiayi 已提交
108
            parallel_exe = fluid.ParallelExecutor(
Y
yuyang18 已提交
109 110 111
                use_cuda=self.use_cuda,
                main_program=main_prog,
                build_strategy=build_strategy)
F
fengjiayi 已提交
112 113 114 115 116 117 118 119 120 121 122 123 124

            if (parallel_exe.device_count > self.batch_size):
                print("WARNING: Unittest TestDataBalance skipped. \
                    For the result is not correct when device count \
                    is larger than batch size.")
                exit(0)
            fetch_list = [image.name, label.name]

            data_appeared = [False] * self.total_ins_num
            while (True):
                try:
                    image_val, label_val = parallel_exe.run(fetch_list,
                                                            return_numpy=True)
125
                except fluid.core.EOFException:
F
fengjiayi 已提交
126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144
                    break
                ins_num = image_val.shape[0]
                broadcasted_label = np.ones(
                    (ins_num, 3, 4)) * label_val.reshape((ins_num, 1, 1))
                self.assertEqual(image_val.all(), broadcasted_label.all())
                for l in label_val:
                    self.assertFalse(data_appeared[l[0]])
                    data_appeared[l[0]] = True
            for i in data_appeared:
                self.assertTrue(i)

    def main_lod(self):
        main_prog = fluid.Program()
        startup_prog = fluid.Program()
        with fluid.program_guard(main_prog, startup_prog):
            data_reader = fluid.layers.io.open_files(
                filenames=[self.lod_data_file_name],
                shapes=[[-1, 3], [-1, 1]],
                lod_levels=[1, 0],
Y
yuyang18 已提交
145
                dtypes=['float32', 'int32'])
F
fengjiayi 已提交
146 147 148 149 150
            ins, label = fluid.layers.read_file(data_reader)

            place = fluid.CUDAPlace(0) if self.use_cuda else fluid.CPUPlace()
            exe = fluid.Executor(place)
            exe.run(startup_prog)
Y
yuyang18 已提交
151 152
            build_strategy = fluid.BuildStrategy()
            build_strategy.enable_data_balance = True
F
fengjiayi 已提交
153
            parallel_exe = fluid.ParallelExecutor(
Y
yuyang18 已提交
154 155 156
                use_cuda=self.use_cuda,
                main_program=main_prog,
                build_strategy=build_strategy)
F
fengjiayi 已提交
157

Y
yuyang18 已提交
158
            if parallel_exe.device_count > self.batch_size:
F
fengjiayi 已提交
159 160 161 162 163 164 165 166 167 168 169
                print("WARNING: Unittest TestDataBalance skipped. \
                    For the result is not correct when device count \
                    is larger than batch size.")
                exit(0)
            fetch_list = [ins.name, label.name]

            data_appeared = [False] * self.total_ins_num
            while (True):
                try:
                    ins_tensor, label_tensor = parallel_exe.run(
                        fetch_list, return_numpy=False)
170
                except fluid.core.EOFException:
F
fengjiayi 已提交
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191
                    break

                ins_val = np.array(ins_tensor)
                label_val = np.array(label_tensor)
                ins_lod = ins_tensor.lod()[0]
                self.assertEqual(ins_val.shape[1], 3)
                self.assertEqual(label_val.shape[1], 1)
                self.assertEqual(len(ins_lod) - 1, label_val.shape[0])
                for i in range(0, len(ins_lod) - 1):
                    ins_elem = ins_val[ins_lod[i]:ins_lod[i + 1]][:]
                    label_elem = label_val[i][0]
                    self.assertEqual(ins_elem.all(), label_elem.all())
                    self.assertFalse(data_appeared[int(label_elem - 1)])
                    data_appeared[int(label_elem - 1)] = True

            for i in data_appeared:
                self.assertTrue(i)

    def test_all(self):
        self.main()
        self.main_lod()
Y
yuyang18 已提交
192 193 194 195


if __name__ == '__main__':
    unittest.main()