test_decorator.py 5.5 KB
Newer Older
1
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
2 3 4 5 6 7 8 9 10 11 12 13
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
D
dzhwinter 已提交
14

15
import functools
16
import sys
17
import time
Y
Yu Yang 已提交
18 19
import unittest

20
import paddle.reader
21

22 23
__all__ = []

24

H
Helin Wang 已提交
25
def reader_creator_10(dur):
26 27
    def reader():
        for i in range(10):
H
Helin Wang 已提交
28
            # this invocation helps testing paddle.reader.buffer
29 30 31 32
            time.sleep(dur)
            yield i

    return reader
33 34


H
Helin Wang 已提交
35 36 37 38 39 40 41 42 43 44 45
class TestMap(unittest.TestCase):
    def test_map(self):
        d = {"h": 0, "i": 1}

        def tokenize(x):
            return d[x]

        def read():
            yield "h"
            yield "i"

46
        r = paddle.reader.map_readers(tokenize, read)
H
Helin Wang 已提交
47 48 49 50
        for i, e in enumerate(r()):
            self.assertEqual(e, i)


51 52 53
class TestBuffered(unittest.TestCase):
    def test_read(self):
        for size in range(20):
54
            b = paddle.reader.buffered(reader_creator_10(0), size)
55 56 57 58 59 60 61 62
            c = 0
            for i in b():
                self.assertEqual(i, c)
                c += 1
            self.assertEqual(c, 10)

    def test_buffering(self):
        # read have 30ms delay.
63
        b = paddle.reader.buffered(reader_creator_10(0.03), 10)
64 65 66 67
        last_time = time.time()
        for idx, i in enumerate(b()):
            elapsed_time = time.time() - last_time
            if i == 0:
Y
Yu Yang 已提交
68
                time.sleep(1)
69 70
            else:
                # read time should be short, meaning already buffered.
Y
Yu Yang 已提交
71
                self.assertLess(elapsed_time, 0.08)
72 73 74
            last_time = time.time()


75 76
class TestCompose(unittest.TestCase):
    def test_compse(self):
77 78 79
        reader = paddle.reader.compose(
            reader_creator_10(0), reader_creator_10(0)
        )
H
Helin Wang 已提交
80
        for idx, e in enumerate(reader()):
81 82 83 84
            self.assertEqual(e, (idx, idx))

    def test_compose_not_aligned(self):
        total = 0
85 86
        reader = paddle.reader.compose(
            paddle.reader.chain(reader_creator_10(0), reader_creator_10(0)),
87 88
            reader_creator_10(0),
        )
89
        with self.assertRaises(paddle.reader.ComposeNotAligned):
H
Helin Wang 已提交
90
            for e in reader():
91 92 93 94 95 96
                total += 1
        # expecting 10, not 20
        self.assertEqual(total, 10)

    def test_compose_not_aligned_no_check(self):
        total = 0
97 98 99 100 101
        reader = paddle.reader.compose(
            paddle.reader.chain(reader_creator_10(0), reader_creator_10(0)),
            reader_creator_10(0),
            check_alignment=False,
        )
H
Helin Wang 已提交
102
        for e in reader():
103 104 105 106 107 108 109
            total += 1
        # expecting 10, not 20
        self.assertEqual(total, 10)


class TestChain(unittest.TestCase):
    def test_chain(self):
110
        c = paddle.reader.chain(reader_creator_10(0), reader_creator_10(0))
111 112 113 114 115 116 117 118 119 120
        idx = 0
        for e in c():
            self.assertEqual(e, idx % 10)
            idx += 1
        self.assertEqual(idx, 20)


class TestShuffle(unittest.TestCase):
    def test_shuffle(self):
        case = [(0, True), (1, True), (10, False), (100, False)]
H
Helin Wang 已提交
121
        a = reader_creator_10(0)
122
        for size, checkEq in case:
123
            s = paddle.reader.shuffle(a, size)
124 125 126 127 128 129 130
            total = 0
            for idx, e in enumerate(s()):
                if checkEq:
                    self.assertEqual(idx, e)
                total += 1
            self.assertEqual(total, 10)

W
wanghaoshuang 已提交
131

132 133 134
class TestXmap(unittest.TestCase):
    def test_xmap(self):
        def mapper(x):
135
            return x + 1
W
wanghaoshuang 已提交
136

137 138 139 140 141 142
        orders = (True, False)
        thread_nums = (1, 2, 4, 8, 16)
        buffered_size = (1, 2, 4, 8, 16)
        for order in orders:
            for tNum in thread_nums:
                for size in buffered_size:
143 144 145
                    reader = paddle.reader.xmap_readers(
                        mapper, reader_creator_10(0), tNum, size, order
                    )
146
                    for n in range(3):
147 148 149 150 151 152 153
                        result = []
                        for i in reader():
                            result.append(i)
                        if not order:
                            result.sort()
                        for idx, e in enumerate(result):
                            self.assertEqual(e, mapper(idx))
W
wanghaoshuang 已提交
154

155

Q
Qiao Longfei 已提交
156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
class TestMultiProcessReader(unittest.TestCase):
    def setup(self):
        self.samples = []
        for i in range(1000):
            self.samples.append([[i], [i + 1, i + 2], i + 3])

        def reader(index):
            for i in range(len(self.samples)):
                if i % 3 == index:
                    yield self.samples[i]

        self.reader0 = functools.partial(reader, 0)
        self.reader1 = functools.partial(reader, 1)
        self.reader2 = functools.partial(reader, 2)

    def reader_test(self, use_pipe):
        self.setup()
        results = []
        for data in paddle.reader.multiprocess_reader(
175 176
            [self.reader0, self.reader1, self.reader2], 100, use_pipe
        )():
Q
Qiao Longfei 已提交
177 178 179
            results.append(data)
        self.assertEqual(sorted(self.samples), sorted(results))

C
chengduo 已提交
180
    def test_distributed_batch_reader(self):
181 182 183
        if sys.platform != 'win32':
            self.reader_test(use_pipe=False)
            self.reader_test(use_pipe=True)
Q
Qiao Longfei 已提交
184 185


186 187
if __name__ == '__main__':
    unittest.main()