data_feeder.py 16.7 KB
Newer Older
1
#   Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
D
dzhwinter 已提交
2
#
D
dzhwinter 已提交
3 4 5
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
D
dzhwinter 已提交
6
#
D
dzhwinter 已提交
7
#     http://www.apache.org/licenses/LICENSE-2.0
D
dzhwinter 已提交
8
#
D
dzhwinter 已提交
9 10 11 12 13 14
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

15
from . import core
16
import numpy as np
C
chengduoZH 已提交
17
import os
Y
yuyang18 已提交
18
import multiprocessing
19
import warnings
20
import struct
Y
Yu Yang 已提交
21

22 23 24 25 26 27 28
from .framework import (
    Variable,
    default_main_program,
    _current_expected_place,
    _non_static_mode,
    _in_eager_without_dygraph_check,
)
C
chengduo 已提交
29
from .framework import _cpu_num, _cuda_ids
30

Y
Yu Yang 已提交
31 32
__all__ = ['DataFeeder']

L
Leo Chen 已提交
33 34 35
_PADDLE_DTYPE_2_NUMPY_DTYPE = {
    core.VarDesc.VarType.BOOL: 'bool',
    core.VarDesc.VarType.FP16: 'float16',
36
    core.VarDesc.VarType.BF16: 'uint16',
L
Leo Chen 已提交
37 38 39 40 41 42 43 44 45 46 47
    core.VarDesc.VarType.FP32: 'float32',
    core.VarDesc.VarType.FP64: 'float64',
    core.VarDesc.VarType.INT8: 'int8',
    core.VarDesc.VarType.INT16: 'int16',
    core.VarDesc.VarType.INT32: 'int32',
    core.VarDesc.VarType.INT64: 'int64',
    core.VarDesc.VarType.UINT8: 'uint8',
    core.VarDesc.VarType.COMPLEX64: 'complex64',
    core.VarDesc.VarType.COMPLEX128: 'complex128',
}

Y
Yu Yang 已提交
48

49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
def copy_bits_from_float_to_uint16(f):
    return struct.unpack('<I', struct.pack('<f', f))[0] >> 16


def convert_float_to_uint16(data, data_format="NCHW"):
    if data.size == 0:
        return data.view(np.uint16)

    if data_format == "NHWC":
        data = np.transpose(data, [0, 3, 1, 2])

    new_data = []
    for x in np.nditer(data):
        new_data.append(np.uint16(copy_bits_from_float_to_uint16(x)))
    new_data = np.reshape(new_data, data.shape).view(np.uint16)

    if data_format == "NHWC":
        new_data = np.transpose(new_output, [0, 2, 3, 1])
    return new_data


S
sneaxiy 已提交
70
def convert_dtype(dtype):
P
pkpk 已提交
71
    if isinstance(dtype, core.VarDesc.VarType):
L
Leo Chen 已提交
72 73
        if dtype in _PADDLE_DTYPE_2_NUMPY_DTYPE:
            return _PADDLE_DTYPE_2_NUMPY_DTYPE[dtype]
74
    elif isinstance(dtype, type):
75
        # This branch is for NumPy scalar types
76
        if dtype in [
77 78 79 80 81 82 83 84 85 86 87 88
            bool,
            np.float16,
            np.uint16,
            np.float32,
            np.float64,
            np.int8,
            np.int16,
            np.int32,
            np.int64,
            np.uint8,
            np.complex64,
            np.complex128,
89 90
        ]:
            return dtype.__name__
P
pkpk 已提交
91
    else:
92
        # This branch is for np.dtype and str
P
pkpk 已提交
93
        if dtype in [
94 95 96 97 98 99 100 101 102 103 104 105
            'bool',
            'float16',
            'uint16',
            'float32',
            'float64',
            'int8',
            'int16',
            'int32',
            'int64',
            'uint8',
            'complex64',
            'complex128',
P
pkpk 已提交
106
        ]:
107 108 109
            # NOTE(SigureMo): Since the np.dtype object is not an instance of
            # type, so it will not be handled by the previous branch. We need
            # to convert it to str here.
P
pkpk 已提交
110
            return str(dtype)
111 112 113
        # NOTE(zhangbo): Now numpy does not support bfloat, so use numpy.uint16 to represent paddle.bfloat16, there binaries are consistent.
        # If cast ndarray to uint16 and trans to tensor, should not ndarray.astype('uint16') directly
        # should use function 'convert_float_to_uint16' above, otherwise bits is wrong
114 115
        if dtype in ['bfloat16']:
            return 'uint16'
P
pkpk 已提交
116

117
    raise TypeError(
118
        "dtype must be any of [bool, float16, uint16, float32, float64, int8, int16, "
119 120
        "int32, int64, uint8, complex64, complex128, bfloat16], but received %s"
        % dtype
121
    )
S
sneaxiy 已提交
122 123


124 125 126
def check_variable_and_dtype(
    input, input_name, expected_dtype, op_name, extra_message=''
):
127
    check_type(input, input_name, Variable, op_name, extra_message)
128 129 130 131
    check_dtype(input.dtype, input_name, expected_dtype, op_name, extra_message)


def check_type(input, input_name, expected_type, op_name, extra_message=''):
132 133 134 135 136 137 138
    # NOTE [ Why skip dynamic graph check ]:
    # 1. If the input type / dtype of a layer is wrong, it will be reported
    # directly on that line. User can easily print the relevant information
    # on which line. It is easier to debug, so there is no need to check
    # in dynamic graph mode.
    # 2. Performance considerations. Because these checks are executed at
    # each step in dynamic graph mode, it will bring a heavy performance burden.
J
Jiabin Yang 已提交
139
    if _non_static_mode():
140
        return
141 142

    # NOTE: `in_declarative_mode` is used to determined whether this op is called under
W
wanghuancoder 已提交
143 144
    # @to_static in transformation from dygrah to static layer. We add Tensor in
    # expected_type to skip checking because Tensor may be created and used in unusual way.
145
    from .dygraph.base import in_declarative_mode
146

147 148 149
    # Need a better design to be fix this.
    if in_declarative_mode():
        if not isinstance(expected_type, tuple):
150
            expected_type = (expected_type,)
W
wanghuancoder 已提交
151 152
        expected_type += (core.eager.Tensor,)
    elif isinstance(input, core.eager.Tensor):
153 154 155
        raise TypeError(
            "Please use `with fluid.dygraph.guard()` as context or `fluid.enable_dygraph()` to switch to imperative mode firstly. "
            "Because received '{}' in {} is a imperative Variable.".format(
156 157 158
                input_name, op_name
            )
        )
159 160
    if not isinstance(input, expected_type):
        raise TypeError(
161 162 163
            "The type of '%s' in %s must be %s, but received %s. %s"
            % (input_name, op_name, expected_type, type(input), extra_message)
        )
164 165


166 167 168
def check_dtype(
    input_dtype, input_name, expected_dtype, op_name, extra_message=''
):
169
    # See NOTE [ Why skip dynamic graph check ]
J
Jiabin Yang 已提交
170
    if _non_static_mode():
171
        return
172 173
    if convert_dtype(input_dtype) in ['float16']:
        warnings.warn(
174 175 176
            "The data type of '%s' in %s only support float16 in GPU now. %s"
            % (input_name, op_name, extra_message)
        )
177
    if convert_dtype(input_dtype) in ['uint16'] and op_name not in [
178 179 180
        'reshape',
        'lookup_table',
        'scale',
181 182 183
    ]:
        warnings.warn(
            "The data type of '%s' in %s only support bfloat16 in OneDNN now. %s"
184 185
            % (input_name, op_name, extra_message)
        )
186 187
    if convert_dtype(input_dtype) not in expected_dtype:
        raise TypeError(
188 189 190
            "The data type of '%s' in %s must be %s, but received %s. %s"
            % (
                input_name,
191
                op_name,
192 193 194 195 196 197 198 199 200 201 202 203 204 205
                expected_dtype,
                convert_dtype(input_dtype),
                extra_message,
            )
        )


def check_shape(
    shape,
    op_name,
    expected_shape_type=(list, tuple, Variable),
    expected_element_type=(int, Variable),
    expected_tensor_dtype=('int32', 'int64'),
):
206
    # See NOTE [ Why skip dynamic graph check ]
J
Jiabin Yang 已提交
207
    if _non_static_mode():
208 209 210 211 212 213 214
        return
    check_type(shape, 'shape', expected_shape_type, op_name)
    if expected_element_type is not None and not isinstance(shape, Variable):
        for item in shape:
            check_type(item, 'element of shape', expected_element_type, op_name)
            if expected_tensor_dtype is not None and isinstance(item, Variable):
                check_dtype(
215 216 217
                    item.dtype,
                    'element of shape',
                    expected_tensor_dtype,
218
                    op_name,
219 220 221 222
                    'If element of shape is Tensor, its data type should be {}'.format(
                        ', '.join(expected_tensor_dtype)
                    ),
                )
223 224 225 226
    if expected_tensor_dtype is not None and isinstance(shape, Variable):
        check_dtype(shape.dtype, 'shape', expected_tensor_dtype, op_name)


227
class DataToLoDTensorConverter:
Y
Yu Yang 已提交
228 229 230 231
    def __init__(self, place, lod_level, shape, dtype):
        self.place = place
        self.lod_level = lod_level
        self.shape = shape
232 233 234 235 236 237 238
        negtive_count = 0
        for s in self.shape:
            if s < 0:
                negtive_count += 1
            if negtive_count > 1:
                self.shape = None
                break
S
sneaxiy 已提交
239 240
        self.dtype = convert_dtype(dtype)
        self._reset()
Y
Yu Yang 已提交
241

S
sneaxiy 已提交
242
    def _reset(self):
Y
Yu Yang 已提交
243
        self.data = []
244
        self.lod = [[] for _ in range(self.lod_level)]
Y
Yu Yang 已提交
245 246 247 248 249 250 251 252

    def feed(self, data):
        self._feed_impl_(data, self.lod, self.lod_level)

    def _feed_impl_(self, data, lod, lod_level):
        if lod_level == 0:
            self.data.append(data)
        else:
253
            lod[0].append(len(data))
Y
Yu Yang 已提交
254
            for each_data in data:
K
Kexin Zhao 已提交
255
                self._feed_impl_(each_data, lod[1:], lod_level - 1)
Y
Yu Yang 已提交
256

S
sneaxiy 已提交
257
    def _check_shape(self, shape):
S
sneaxiy 已提交
258 259 260
        for s1, s2 in zip(self.shape, shape):
            if s1 != s2 and s1 >= 0 and s2 >= 0:
                raise ValueError(
261 262 263 264
                    "Shape not match. What is defined in data layer is {}, but receive {}".format(
                        self.shape, shape
                    )
                )
S
sneaxiy 已提交
265

Y
Yu Yang 已提交
266
    def done(self):
267
        arr = np.array(self.data, dtype=self.dtype)
S
sneaxiy 已提交
268 269
        if self.shape:
            if len(arr.shape) != len(self.shape):
S
sneaxiy 已提交
270 271 272 273
                try:
                    arr = arr.reshape(self.shape)
                except ValueError:
                    raise ValueError(
274 275 276 277
                        "Reshape error. What is defined in data layer is {}, but receive {}".format(
                            self.shape, arr.shape
                        )
                    )
Y
Yu Yang 已提交
278 279 280
        t = core.LoDTensor()
        t.set(arr, self.place)
        if self.lod_level > 0:
281
            t.set_recursive_sequence_lengths(self.lod)
S
sneaxiy 已提交
282
        self._reset()
Y
Yu Yang 已提交
283 284 285
        return t


286
class BatchedTensorProvider:
S
sneaxiy 已提交
287 288 289 290 291 292 293 294 295 296
    def __init__(self, feed_list, place, batch_size, generator, drop_last):
        self.place = place
        self.batch_size = batch_size
        self.generator = generator
        self.converters = []
        self.drop_last = drop_last

        for var in feed_list:
            assert var.lod_level == 0, "lod_level must be 0"
            self.converters.append(
297 298 299 300 301 302 303
                DataToLoDTensorConverter(
                    place=self.place,
                    lod_level=0,
                    shape=var.shape,
                    dtype=var.dtype,
                )
            )
S
sneaxiy 已提交
304 305 306 307 308 309 310

    def _done(self):
        return [c.done() for c in self.converters]

    def __call__(self):
        idx = 0
        for each_sample in self.generator():
311
            for each_slot, each_converter in zip(each_sample, self.converters):
S
sneaxiy 已提交
312 313 314 315 316 317 318 319 320 321 322 323 324
                each_converter.data.append(each_slot)

            idx += 1
            if idx == self.batch_size:
                idx = 0
                yield self._done()

        if not self.drop_last and idx > 0:
            yield self._done()
        else:
            [c._reset() for c in self.converters]


325
class DataFeeder:
C
chengduoZH 已提交
326
    """
327
    :api_attr: Static Graph
328

C
chengduoZH 已提交
329
    DataFeeder converts the data that returned by a reader into a data
330 331
    structure that can feed into Executor. The reader is usually a
    python generator that returns a list of mini-batch data entries.
332 333 334 335

    Parameters:
        feed_list (list): Variables or names of Variables that need
            to feed.
336 337 338 339
        place (:ref:`api_fluid_CPUPlace` | :ref:`api_fluid_CUDAPlace` ):
            place indicates the device (CPU | GPU) the data will be fed into, if
            you want to feed data into GPU, please using :code:`fluid.CUDAPlace(i)`
            (:code:`i` represents the GPU id), or if you want to feed data into CPU,
340
            please using :code:`fluid.CPUPlace()`.
341 342
        program (:ref:`api_fluid_Program` , optional): The Program that will
            feed data into, if program is None, it will use default_main_program().
343
            Default None.
C
chengduoZH 已提交
344 345

    Raises:
346
        :code:`ValueError` - If some Variables are not in this Program.
C
chengduoZH 已提交
347

348
    Example:
349 350 351 352 353
        ..  code-block:: python

            import numpy as np
            import paddle
            import paddle.fluid as fluid
354

C
chengduoZH 已提交
355
            place = fluid.CPUPlace()
356
            def reader():
357 358
                for _ in range(4):
                    yield np.random.random([4]).astype('float32'), np.random.random([3]).astype('float32'),
359

360 361
            main_program = fluid.Program()
            startup_program = fluid.Program()
362

363
            with fluid.program_guard(main_program, startup_program):
364 365
                data_1 = paddle.static.data(name='data_1', shape=[None, 2, 2], dtype='float32')
                data_2 = paddle.static.data(name='data_2', shape=[None, 1, 3], dtype='float32')
C
Charles-hit 已提交
366
                out = paddle.static.nn.fc(x=[data_1, data_2], size=2)
367 368
                # ...
            feeder = fluid.DataFeeder([data_1, data_2], place)
369

370 371
            exe = fluid.Executor(place)
            exe.run(startup_program)
372

373
            feed_data = feeder.feed(reader())
374

375 376 377
            # print feed_data to view feed results
            # print(feed_data['data_1'])
            # print(feed_data['data_2'])
378

379 380 381
            outs = exe.run(program=main_program,
                            feed=feed_data,
                            fetch_list=[out])
382
            print(outs)
383

C
chengduoZH 已提交
384 385
    """

F
fengjiayi 已提交
386
    def __init__(self, feed_list, place, program=None):
Y
Yu Yang 已提交
387 388 389 390
        self.feed_dtypes = []
        self.feed_names = []
        self.feed_shapes = []
        self.feed_lod_level = []
F
fengjiayi 已提交
391 392
        if program is None:
            program = default_main_program()
Y
Yu Yang 已提交
393
        for each_var in feed_list:
394
            if isinstance(each_var, str):
F
fengjiayi 已提交
395
                each_var = program.block(0).var(each_var)
Y
Yu Yang 已提交
396 397 398 399 400
            if not isinstance(each_var, Variable):
                raise TypeError("Feed list should contain a list of variable")
            self.feed_dtypes.append(each_var.dtype)
            self.feed_names.append(each_var.name)
            self.feed_lod_level.append(each_var.lod_level)
S
sneaxiy 已提交
401
            self.feed_shapes.append(each_var.shape)
Y
Yu Yang 已提交
402 403 404 405

        self.place = place

    def feed(self, iterable):
C
chengduoZH 已提交
406
        """
407
        According to :code:`feed_list` of :code:`DataFeeder` and :code:`iterable` , converts
408
        the input into a data structure that can feed into Executor.
C
chengduoZH 已提交
409

410 411
        Parameters:
            iterable (generator): user defined python generator to read the raw input data
C
chengduoZH 已提交
412

413
        Returns:
414
            :code:`dict`: a :code:`dict` that contains (variable name - converted tensor) pairs
415

416
        Example:
417 418
            ..  code-block:: python

419 420 421 422 423 424
                # In this example, reader - generator will return a list of ndarray of 3 elements
                # feed API will convert each ndarray input into a tensor
                # the return result is a dict with keys: data_1, data_2, data_3
                # result['data_1']  a LoD-Tensor with shape of  [5, 2, 1, 3]. 5 is batch size, and [2, 1, 3] is the real shape of data_1.
                # result['data_2'], result['data_3'] are similar.
                import numpy as np
425
                import paddle.fluid as fluid
426

427
                def reader(limit=5):
428 429
                    for i in range(1, limit + 1):
                        yield np.ones([6]).astype('float32') * i , np.ones([1]).astype('int64') * i, np.random.random([9]).astype('float32')
430

431 432 433
                data_1 = paddle.static.data(name='data_1', shape=[None, 2, 1, 3])
                data_2 = paddle.static.data(name='data_2', shape=[None, 1], dtype='int64')
                data_3 = paddle.static.data(name='data_3', shape=[None, 3, 3], dtype='float32')
434
                feeder = fluid.DataFeeder(['data_1','data_2', 'data_3'], fluid.CPUPlace())
435 436


437 438 439
                result = feeder.feed(reader())
                print(result['data_1'])
                print(result['data_2'])
440
                print(result['data_3'])
441

C
chengduoZH 已提交
442
        """
Y
Yu Yang 已提交
443
        converter = []
444 445 446
        for lod_level, shape, dtype in zip(
            self.feed_lod_level, self.feed_shapes, self.feed_dtypes
        ):
Y
Yu Yang 已提交
447
            converter.append(
448 449 450 451 452 453 454
                DataToLoDTensorConverter(
                    place=self.place,
                    lod_level=lod_level,
                    shape=shape,
                    dtype=dtype,
                )
            )
Y
Yu Yang 已提交
455 456

        for each_sample in iterable:
457
            assert len(each_sample) == len(converter), (
458 459 460
                "The number of fields in data (%d) does not match "
                + "len(feed_list) (%d)"
            ) % (len(each_sample), len(converter))
461
            for each_converter, each_slot in zip(converter, each_sample):
Y
Yu Yang 已提交
462 463
                each_converter.feed(each_slot)
        ret_dict = {}
464
        for each_name, each_converter in zip(self.feed_names, converter):
Y
Yu Yang 已提交
465 466
            ret_dict[each_name] = each_converter.done()
        return ret_dict
Y
yuyang18 已提交
467 468 469 470 471

    def _get_number_of_places_(self, num_places):
        if num_places is not None:
            return int(num_places)
        elif isinstance(self.place, core.CUDAPlace):
C
chengduo 已提交
472
            return len(_cuda_ids())
Y
yuyang18 已提交
473
        else:
C
chengduo 已提交
474
            return _cpu_num()