decorator.py 5.3 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14
# Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

H
Helin Wang 已提交
15 16
__all__ = [
    'map_readers', 'buffered', 'compose', 'chain', 'shuffle',
17
    'ComposeNotAligned', 'firstn'
H
Helin Wang 已提交
18
]
19

20 21
import itertools
import random
Y
Yu Yang 已提交
22 23
from Queue import Queue
from threading import Thread
24 25


H
Helin Wang 已提交
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
def map_readers(func, *readers):
    """
    Creates a data reader that outputs return value of function using
    output of each data readers as arguments.

    :param func: function to use.
    :param *readers: readers whose outputs will be used as arguments of func.
    :returns: the created data reader.
    """

    def reader():
        rs = []
        for r in readers:
            rs.append(r())
        for e in itertools.imap(func, *rs):
            yield e

    return reader


H
Helin Wang 已提交
46
def shuffle(reader, buf_size):
47 48
    """
    Creates a data reader whose data output is suffled.
49

H
Helin Wang 已提交
50
    Output from the iterator that created by original reader will be
51 52 53
    buffered into shuffle buffer, and then shuffled. The size of shuffle buffer
    is determined by argument buf_size.

54 55
    :param reader: the original reader whose output will be shuffled.
    :param buf_size: shuffle buffer size.
56

57
    :returns:the new reader whose output is shuffled.
58 59
    """

H
Helin Wang 已提交
60
    def data_reader():
61
        buf = []
H
Helin Wang 已提交
62
        for e in reader():
63 64 65 66 67 68 69 70 71 72 73 74
            buf.append(e)
            if len(buf) >= buf_size:
                random.shuffle(buf)
                for b in buf:
                    yield b
                buf = []

        if len(buf) > 0:
            random.shuffle(buf)
            for b in buf:
                yield b

H
Helin Wang 已提交
75
    return data_reader
76 77


H
Helin Wang 已提交
78
def chain(*readers):
79 80 81
    """
    Creates a data reader whose output is the outputs of input data
    readers chained together.
82

H
Helin Wang 已提交
83
    If input readers output following data entries:
84 85 86
    [0, 0, 0]
    [1, 1, 1]
    [2, 2, 2]
H
Helin Wang 已提交
87
    The chained reader will output:
88 89
    [0, 0, 0, 1, 1, 1, 2, 2, 2]

90 91
    :param readers: input readers.
    :returns: the new data reader.
92 93
    """

H
Helin Wang 已提交
94
    def reader():
95
        rs = []
H
Helin Wang 已提交
96
        for r in readers:
97 98 99 100 101
            rs.append(r())

        for e in itertools.chain(*rs):
            yield e

H
Helin Wang 已提交
102
    return reader
103 104


H
Helin Wang 已提交
105
class ComposeNotAligned(ValueError):
106 107 108
    pass


H
Helin Wang 已提交
109
def compose(*readers, **kwargs):
110 111
    """
    Creates a data reader whose output is the combination of input readers.
112

H
Helin Wang 已提交
113
    If input readers output following data entries:
114
    (1, 2)    3    (4, 5)
H
Helin Wang 已提交
115
    The composed reader will output:
116 117
    (1, 2, 3, 4, 5)

118 119 120 121
    :*readers: readers that will be composed together.
    :check_alignment: if True, will check if input readers are aligned
        correctly. If False, will not check alignment and trailing outputs
        will be discarded. Defaults to True.
122

123
    :returns: the new data reader.
124

125 126
    :raises ComposeNotAligned: outputs of readers are not aligned.
        Will not raise when check_alignment is set to False.
127 128 129 130 131 132 133 134 135
    """
    check_alignment = kwargs.pop('check_alignment', True)

    def make_tuple(x):
        if isinstance(x, tuple):
            return x
        else:
            return (x, )

H
Helin Wang 已提交
136
    def reader():
137
        rs = []
H
Helin Wang 已提交
138
        for r in readers:
139 140 141 142 143 144 145 146 147
            rs.append(r())
        if not check_alignment:
            for outputs in itertools.izip(*rs):
                yield sum(map(make_tuple, outputs), ())
        else:
            for outputs in itertools.izip_longest(*rs):
                for o in outputs:
                    if o is None:
                        # None will be not be present if compose is aligned
H
Helin Wang 已提交
148 149
                        raise ComposeNotAligned(
                            "outputs of readers are not aligned.")
150 151
                yield sum(map(make_tuple, outputs), ())

H
Helin Wang 已提交
152
    return reader
153 154


H
Helin Wang 已提交
155
def buffered(reader, size):
156 157
    """
    Creates a buffered data reader.
158

H
Helin Wang 已提交
159 160
    The buffered data reader will read and save data entries into a
    buffer. Reading from the buffered data reader will proceed as long
161
    as the buffer is not empty.
162
    
163 164
    :param reader: the data reader to read from.
    :param size: max buffer size.
165
    
166
    :returns: the buffered data reader.
167 168 169 170 171 172 173 174 175 176 177 178
    """

    class EndSignal():
        pass

    end = EndSignal()

    def read_worker(r, q):
        for d in r:
            q.put(d)
        q.put(end)

H
Helin Wang 已提交
179 180
    def data_reader():
        r = reader()
181 182 183 184 185 186 187 188 189 190 191 192
        q = Queue(maxsize=size)
        t = Thread(
            target=read_worker, args=(
                r,
                q, ))
        t.daemon = True
        t.start()
        e = q.get()
        while e != end:
            yield e
            e = q.get()

H
Helin Wang 已提交
193
    return data_reader
Y
Yu Yang 已提交
194 195


Y
Yu Yang 已提交
196
def firstn(reader, n):
Y
Yu Yang 已提交
197 198 199 200
    """
    Limit the max number of samples that reader could return.
    """

Y
Yu Yang 已提交
201 202 203 204
    # TODO(yuyang18): Check if just drop the reader, could clean the opened
    # resource or not?

    def firstn_reader():
Y
Yu Yang 已提交
205
        for i, item in enumerate(reader()):
Y
Yu Yang 已提交
206
            if i == n:
Y
Yu Yang 已提交
207 208 209
                break
            yield item

Y
Yu Yang 已提交
210
    return firstn_reader