diff --git a/paddle/fluid/API.spec b/paddle/fluid/API.spec index db3739e65bef8b708ce3d1bda70a3eed454ab94d..746b1eecfe2a09b9646a07f81086e02ead77fa3a 100644 --- a/paddle/fluid/API.spec +++ b/paddle/fluid/API.spec @@ -61,8 +61,9 @@ paddle.fluid.io.load_params ArgSpec(args=['executor', 'dirname', 'main_program', paddle.fluid.io.load_persistables ArgSpec(args=['executor', 'dirname', 'main_program', 'filename'], varargs=None, keywords=None, defaults=(None, None)) paddle.fluid.io.save_inference_model ArgSpec(args=['dirname', 'feeded_var_names', 'target_vars', 'executor', 'main_program', 'model_filename', 'params_filename', 'export_for_deployment'], varargs=None, keywords=None, defaults=(None, None, None, True)) paddle.fluid.io.load_inference_model ArgSpec(args=['dirname', 'executor', 'model_filename', 'params_filename', 'pserver_endpoints'], varargs=None, keywords=None, defaults=(None, None, None)) -paddle.fluid.io.PyReader.__init__ ArgSpec(args=['self', 'feed_list', 'capacity', 'use_double_buffer', 'iterable'], varargs=None, keywords=None, defaults=(True, True)) +paddle.fluid.io.PyReader.__init__ ArgSpec(args=['self', 'feed_list', 'capacity', 'use_double_buffer', 'iterable'], varargs=None, keywords=None, defaults=(True, False)) paddle.fluid.io.PyReader.decorate_paddle_reader ArgSpec(args=['self', 'reader', 'places'], varargs=None, keywords=None, defaults=(None,)) +paddle.fluid.io.PyReader.decorate_sample_generator ArgSpec(args=['self', 'sample_generator', 'batch_size', 'drop_last', 'places'], varargs=None, keywords=None, defaults=(True, None)) paddle.fluid.io.PyReader.decorate_tensor_provider ArgSpec(args=['self', 'reader', 'places'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.io.PyReader.reset ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) paddle.fluid.io.PyReader.start ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) diff --git a/paddle/fluid/operators/reader/blocking_queue.h b/paddle/fluid/operators/reader/blocking_queue.h index 7962c0332dbf53964090177b5db0f186d63406f2..78d238aa6115265023d5d87c01048a87180448d0 100644 --- a/paddle/fluid/operators/reader/blocking_queue.h +++ b/paddle/fluid/operators/reader/blocking_queue.h @@ -16,6 +16,7 @@ #include // NOLINT #include +#include #include "paddle/fluid/platform/enforce.h" diff --git a/paddle/fluid/operators/reader/buffered_reader.cc b/paddle/fluid/operators/reader/buffered_reader.cc index b8c98ff5e769c710e4bfe0c64cafe13e2eae3a53..c9962b4ac2dd6bcb133aeda505e1428b56b37551 100644 --- a/paddle/fluid/operators/reader/buffered_reader.cc +++ b/paddle/fluid/operators/reader/buffered_reader.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "paddle/fluid/operators/reader/buffered_reader.h" +#include #include #include "paddle/fluid/framework/data_type.h" diff --git a/paddle/fluid/operators/reader/buffered_reader.h b/paddle/fluid/operators/reader/buffered_reader.h index 6b21de0949cc4de681bef99bc5396974d1f700d0..5f8b2d47c22d0a15d53c8d30d39608fd64d4bddd 100644 --- a/paddle/fluid/operators/reader/buffered_reader.h +++ b/paddle/fluid/operators/reader/buffered_reader.h @@ -15,6 +15,7 @@ #pragma once #include +#include #include #include #include "ThreadPool.h" diff --git a/paddle/fluid/operators/reader/py_reader.cc b/paddle/fluid/operators/reader/py_reader.cc index f2c28c1df89bcb8109fea7859278799ad3a0e2ed..155ae859defcf20a5e226a4abfb99dc308dfb23c 100644 --- a/paddle/fluid/operators/reader/py_reader.cc +++ b/paddle/fluid/operators/reader/py_reader.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "paddle/fluid/operators/reader/py_reader.h" +#include namespace paddle { namespace operators { diff --git a/paddle/fluid/operators/reader/py_reader.h b/paddle/fluid/operators/reader/py_reader.h index 7d760eca64fb3122989acdf710feb4c1af42fb06..43079075142e8db22c0e3b7c86de4249d447f961 100644 --- a/paddle/fluid/operators/reader/py_reader.h +++ b/paddle/fluid/operators/reader/py_reader.h @@ -15,6 +15,7 @@ #pragma once #include +#include #include #include "paddle/fluid/framework/reader.h" #include "paddle/fluid/operators/reader/lod_tensor_blocking_queue.h" diff --git a/paddle/fluid/pybind/reader_py.cc b/paddle/fluid/pybind/reader_py.cc index 8af049031046c365cadd4e62305414a87a7a5ec7..af7d30552ed47c0fbe26090b328cc7128b90f84d 100644 --- a/paddle/fluid/pybind/reader_py.cc +++ b/paddle/fluid/pybind/reader_py.cc @@ -13,7 +13,10 @@ // limitations under the License. #include "paddle/fluid/pybind/reader_py.h" +#include #include +#include +#include #include #include "paddle/fluid/framework/reader.h" #include "paddle/fluid/operators/reader/buffered_reader.h" diff --git a/python/paddle/fluid/data_feeder.py b/python/paddle/fluid/data_feeder.py index a24e1d13003d5c42fa9b5a9346d81c8de4ba45c4..83d7cef19c139d1fbd7267dcaae5dcbf44532268 100644 --- a/python/paddle/fluid/data_feeder.py +++ b/python/paddle/fluid/data_feeder.py @@ -26,6 +26,24 @@ from .framework import Variable, default_main_program __all__ = ['DataFeeder'] +def convert_dtype(dtype): + if dtype == core.VarDesc.VarType.FP32: + return 'float32' + elif dtype == core.VarDesc.VarType.INT64: + return 'int64' + elif dtype == core.VarDesc.VarType.FP64: + return 'float64' + elif dtype == core.VarDesc.VarType.FP16: + return 'float16' + elif dtype == core.VarDesc.VarType.INT32: + return 'int32' + elif dtype == core.VarDesc.VarType.UINT8: + return 'uint8' + else: + raise ValueError("dtype must be any of [int32, float32, int64, " + "float64, uint8]") + + class DataToLoDTensorConverter(object): def __init__(self, place, lod_level, shape, dtype): self.place = place @@ -38,27 +56,12 @@ class DataToLoDTensorConverter(object): if negtive_count > 1: self.shape = None break - if dtype == core.VarDesc.VarType.FP32: - self.dtype = 'float32' - elif dtype == core.VarDesc.VarType.INT64: - self.dtype = 'int64' - elif dtype == core.VarDesc.VarType.FP64: - self.dtype = 'float64' - elif dtype == core.VarDesc.VarType.FP16: - self.dtype = 'float16' - elif dtype == core.VarDesc.VarType.INT32: - self.dtype = 'int32' - elif dtype == core.VarDesc.VarType.UINT8: - self.dtype = 'uint8' - else: - raise ValueError("dtype must be any of [int32, float32, int64, " - "float64, uint8]") + self.dtype = convert_dtype(dtype) + self._reset() + def _reset(self): self.data = [] - self.lod = [] - - for i in six.moves.range(lod_level): - self.lod.append([]) + self.lod = [[] for _ in six.moves.range(self.lod_level)] def feed(self, data): self._feed_impl_(data, self.lod, self.lod_level) @@ -88,15 +91,52 @@ class DataToLoDTensorConverter(object): raise ValueError( "Reshape error. What is defined in data layer is {}, but receive {}" .format(self.shape, arr.shape)) - #else: - # self._check_shape(arr.shape) t = core.LoDTensor() t.set(arr, self.place) if self.lod_level > 0: t.set_recursive_sequence_lengths(self.lod) + self._reset() return t +class BatchedTensorProvider(object): + def __init__(self, feed_list, place, batch_size, generator, drop_last): + self.place = place + self.batch_size = batch_size + self.generator = generator + self.converters = [] + self.drop_last = drop_last + + for var in feed_list: + assert var.lod_level == 0, "lod_level must be 0" + self.converters.append( + DataToLoDTensorConverter( + place=self.place, + lod_level=0, + shape=var.shape, + dtype=var.dtype)) + + def _done(self): + return [c.done() for c in self.converters] + + def __call__(self): + idx = 0 + for each_sample in self.generator(): + for each_slot, each_converter in six.moves.zip(each_sample, + self.converters): + each_converter.data.append(each_slot) + + idx += 1 + if idx == self.batch_size: + idx = 0 + yield self._done() + + if not self.drop_last and idx > 0: + yield self._done() + else: + [c._reset() for c in self.converters] + + class DataFeeder(object): """ DataFeeder converts the data that returned by a reader into a data diff --git a/python/paddle/fluid/reader.py b/python/paddle/fluid/reader.py index 7d08403d2610dd1355e5f90e30a5b1fd68ed3389..49ea1b83b5d2ae60d5642ad3471f205bad796fe1 100644 --- a/python/paddle/fluid/reader.py +++ b/python/paddle/fluid/reader.py @@ -17,7 +17,7 @@ import six import threading from .framework import Program, Variable, program_guard, default_main_program, default_startup_program from .executor import global_scope -from .data_feeder import DataFeeder +from .data_feeder import DataFeeder, BatchedTensorProvider from .layers.io import monkey_patch_reader_methods, _copy_reader_var_, double_buffer from .unique_name import UniqueNameGenerator @@ -46,7 +46,7 @@ class PyReader(object): feed_list, capacity, use_double_buffer=True, - iterable=True): + iterable=False): """ Create a reader object for data feeding in Python. Data would be prefetched using Python thread and be pushed @@ -269,6 +269,54 @@ class PyReader(object): self._thread.daemon = True self._thread.start() + def decorate_sample_generator(self, + sample_generator, + batch_size, + drop_last=True, + places=None): + ''' + Set the data source of the PyReader object. + + The provided :code:`sample_generator` should be a Python generator, + which yields numpy.ndarray typed data of each sample. + + :code:`places` must be set when the PyReader object is iterable. + + If all inputs have no lods, this method is faster than + :code:`decorate_paddle_reader(paddle.batch(sample_generator, ...))` . + + Args: + sample_generator (generator): Python generator that yields + numpy.ndarray-typed sample data. + batch_size (int): batch size. Must be larger than 0. + drop_last (bool): Whether to drop the last batch when sample number + is less than batch_size. + places (None|list(CUDAPlace)|list(CPUPlace)): place list. Must + be provided when PyReader is iterable. + ''' + assert batch_size > 0, "batch_size must be larger than 0" + has_lod = False + for f in self._feed_list: + if f.lod_level != 0: + has_lod = True + break + + if has_lod: + self.decorate_paddle_reader( + paddle.batch( + sample_generator, + batch_size=batch_size, + drop_last=drop_last), + places=places) + else: + reader = BatchedTensorProvider( + feed_list=self._feed_list, + place=core.CPUPlace(), + batch_size=batch_size, + generator=sample_generator, + drop_last=drop_last) + self.decorate_tensor_provider(reader, places=places) + def decorate_paddle_reader(self, reader, places=None): ''' Set the data source of the PyReader object. @@ -279,8 +327,10 @@ class PyReader(object): :code:`places` must be set when the PyReader object is iterable. Args: - reader (generator): Python generator that yields numpy-typed - batched data. + reader (generator): Python generator that yields + list(numpy.ndarray)-typed batched data. + places (None|list(CUDAPlace)|list(CPUPlace)): place list. Must + be provided when PyReader is iterable. ''' assert self._tensor_reader is None, \ "Cannot reset the data source of PyReader" @@ -307,6 +357,8 @@ class PyReader(object): Args: reader (generator): Python generator that yields LoDTensor-typed batched data. + places (None|list(CUDAPlace)|list(CPUPlace)): place list. Must + be provided when PyReader is iterable. ''' assert self._tensor_reader is None, \ "Cannot reset the data source of PyReader" diff --git a/python/paddle/fluid/tests/unittests/test_decoupled_py_reader.py b/python/paddle/fluid/tests/unittests/test_decoupled_py_reader.py index 96a11edd496661148e064f41e36024cdb7539bba..7112a57743149d4bed387ac23d6a4892b18724f1 100644 --- a/python/paddle/fluid/tests/unittests/test_decoupled_py_reader.py +++ b/python/paddle/fluid/tests/unittests/test_decoupled_py_reader.py @@ -127,7 +127,6 @@ class TestBase(unittest.TestCase): step_list.append(step) end_t = time.time() ret = {"time": end_t - start_t, "step": step_list} - scope._remove_from_pool() return ret def prepare_places(self, with_data_parallel, with_cpu=True, with_gpu=True): diff --git a/python/paddle/fluid/tests/unittests/test_py_reader_sample_generator.py b/python/paddle/fluid/tests/unittests/test_py_reader_sample_generator.py new file mode 100644 index 0000000000000000000000000000000000000000..2f8f0b1b6e5b764728b4b22fa0fcf5369678bcfa --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_py_reader_sample_generator.py @@ -0,0 +1,137 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import paddle +import paddle.fluid as fluid +import math +import unittest +import numpy as np +import os + +os.environ['CPU_NUM'] = '1' + + +def random_reader(sample_num): + def __impl__(): + for _ in range(sample_num): + yield np.random.random( + size=[784]).astype('float32'), np.random.random_integers( + low=0, high=9, size=[1]).astype('int64') + + return paddle.reader.cache(__impl__) + + +class TestCaseBase(unittest.TestCase): + def setUp(self): + self.batch_size = 32 + self.epoch_num = 2 + self.sample_num = 165 + + def generate_all_data(self, reader): + ret = [] + for d in reader(): + slots = [[], []] + for item in d: + slots[0].append(item[0]) + slots[1].append(item[1]) + slots = [np.array(slot) for slot in slots] + ret.append(slots) + return ret + + def run_main(self, reader, use_sample_generator, iterable, drop_last): + image = fluid.layers.data(name='image', dtype='float32', shape=[784]) + label = fluid.layers.data(name='label', dtype='int64', shape=[1]) + py_reader = fluid.io.PyReader( + feed_list=[image, label], + capacity=16, + iterable=iterable, + use_double_buffer=False) + + batch_reader = paddle.batch(reader, self.batch_size, drop_last) + all_datas = self.generate_all_data(batch_reader) + + if not use_sample_generator: + py_reader.decorate_paddle_reader( + batch_reader, places=fluid.cpu_places()) + else: + py_reader.decorate_sample_generator( + reader, self.batch_size, drop_last, places=fluid.cpu_places()) + + if drop_last: + batch_num = int(self.sample_num / self.batch_size) + else: + batch_num = math.ceil(float(self.sample_num) / self.batch_size) + + exe = fluid.Executor(fluid.CPUPlace()) + exe.run(fluid.default_startup_program()) + for _ in range(self.epoch_num): + if py_reader.iterable: + step = 0 + for data in py_reader(): + img, lbl = exe.run(feed=data, fetch_list=[image, label]) + self.assertArrayEqual(img, all_datas[step][0]) + self.assertArrayEqual(lbl, all_datas[step][1]) + step += 1 + self.assertEqual(step, len(all_datas)) + else: + step = 0 + try: + py_reader.start() + while True: + img, lbl = exe.run(fetch_list=[image, label]) + self.assertArrayEqual(img, all_datas[step][0]) + self.assertArrayEqual(lbl, all_datas[step][1]) + step += 1 + except fluid.core.EOFException: + py_reader.reset() + self.assertEqual(step, len(all_datas)) + break + + def assertArrayEqual(self, arr1, arr2): + self.assertEqual(arr1.shape, arr2.shape) + self.assertTrue((arr1 == arr2).all()) + + def test_main(self): + reader = random_reader(self.sample_num) + for use_sample_generator in [False, True]: + for iterable in [False, True]: + for drop_last in [False, True]: + with fluid.program_guard(fluid.Program(), fluid.Program()): + self.run_main(reader, use_sample_generator, iterable, + drop_last) + + +class TestCase1(TestCaseBase): + def setUp(self): + self.batch_size = 32 + self.epoch_num = 10 + self.sample_num = 160 + + +class TestCase2(TestCaseBase): + def setUp(self): + self.batch_size = 32 + self.epoch_num = 2 + self.sample_num = 200 + + +class TestCase3(TestCaseBase): + def setUp(self): + self.batch_size = 32 + self.epoch_num = 2 + self.sample_num = 159 + + +if __name__ == '__main__': + unittest.main()