From 33318acdbb57807bf40840c40214a4fbe7e2ff2b Mon Sep 17 00:00:00 2001 From: liuwei1031 <46661762+liuwei1031@users.noreply.github.com> Date: Sat, 12 Oct 2019 16:05:44 +0800 Subject: [PATCH] Cherry pick (#20515) (#20536) (#20568) (#20573) * improve the doc of data feeder related APIs (#20515) * improve data feeder related API * fix doc of default_main_program, multiprocess_reader (#20536) * fix doc of default_main_program, multiprocess_reader * update API.spec * fix comment * update data feeder API sample, change fluid.layers.data to fluid.data (#20568) * update data feeder API sample, fluid.layers.data => fluid.data * update API.spec * update API.spec --- paddle/fluid/API.spec | 12 +- python/paddle/fluid/data_feeder.py | 260 +++++++++++++++-------------- python/paddle/fluid/framework.py | 28 ++-- python/paddle/reader/decorator.py | 86 ++++++++-- 4 files changed, 230 insertions(+), 156 deletions(-) diff --git a/paddle/fluid/API.spec b/paddle/fluid/API.spec index f9dfeb4d2ec..7b7347bdaa2 100644 --- a/paddle/fluid/API.spec +++ b/paddle/fluid/API.spec @@ -8,7 +8,7 @@ paddle.fluid.Program.list_vars (ArgSpec(args=['self'], varargs=None, keywords=No paddle.fluid.Program.parse_from_string (ArgSpec(args=['binary_str'], varargs=None, keywords=None, defaults=None), ('document', 'fc4a5660ff4280278402688f0014ce7f')) paddle.fluid.Program.to_string (ArgSpec(args=['self', 'throw_on_error', 'with_details'], varargs=None, keywords=None, defaults=(False,)), ('document', '7dde33f16b63aa50d474870a9cebb539')) paddle.fluid.default_startup_program (ArgSpec(args=[], varargs=None, keywords=None, defaults=None), ('document', 'f53890b2fb8c0642b6047e4fee2d6d58')) -paddle.fluid.default_main_program (ArgSpec(args=[], varargs=None, keywords=None, defaults=None), ('document', '853718df675e59aea7104f3d61bbf11d')) +paddle.fluid.default_main_program (ArgSpec(args=[], varargs=None, keywords=None, defaults=None), ('document', '082aa471d247bd8d7c87814105439e1a')) paddle.fluid.program_guard (ArgSpec(args=['main_program', 'startup_program'], varargs=None, keywords=None, defaults=(None,)), ('document', '78fb5c7f70ef76bcf4a1862c3f6b8191')) paddle.fluid.name_scope (ArgSpec(args=['prefix'], varargs=None, keywords=None, defaults=(None,)), ('document', '907a5f877206079d8e67ae69b06bb3ba')) paddle.fluid.cuda_places (ArgSpec(args=['device_ids'], varargs=None, keywords=None, defaults=(None,)), ('document', 'ab9bd2079536114aa7c1488a489ee87f')) @@ -101,7 +101,7 @@ paddle.fluid.io.chain (ArgSpec(args=[], varargs='readers', keywords=None, defaul paddle.fluid.io.shuffle (ArgSpec(args=['reader', 'buf_size'], varargs=None, keywords=None, defaults=None), ('document', '961d0a950cc837c8b13577301dee7bd8')) paddle.fluid.io.firstn (ArgSpec(args=['reader', 'n'], varargs=None, keywords=None, defaults=None), ('document', 'db83c761a5530a05c1ffe2f6f78198f4')) paddle.fluid.io.xmap_readers (ArgSpec(args=['mapper', 'reader', 'process_num', 'buffer_size', 'order'], varargs=None, keywords=None, defaults=(False,)), ('document', '17a1d4e59c4260a9416ff269c5e347a3')) -paddle.fluid.io.multiprocess_reader (ArgSpec(args=['readers', 'use_pipe', 'queue_size'], varargs=None, keywords=None, defaults=(True, 1000)), ('document', '7d8b3a96e592107c893d5d51ce968ba0')) +paddle.fluid.io.multiprocess_reader (ArgSpec(args=['readers', 'use_pipe', 'queue_size'], varargs=None, keywords=None, defaults=(True, 1000)), ('document', '1749709ed7aeb08c1269f58d92ea13e0')) paddle.fluid.initializer.ConstantInitializer ('paddle.fluid.initializer.ConstantInitializer', ('document', '911263fc30c516c55e89cd72086a23f8')) paddle.fluid.initializer.ConstantInitializer.__init__ (ArgSpec(args=['self', 'value', 'force_cpu'], varargs=None, keywords=None, defaults=(0.0, False)), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) paddle.fluid.initializer.UniformInitializer ('paddle.fluid.initializer.UniformInitializer', ('document', '264e7794745ec36cf826a6f243027db7')) @@ -1100,11 +1100,11 @@ paddle.fluid.ParamAttr ('paddle.fluid.param_attr.ParamAttr', ('document', '7b5bf paddle.fluid.ParamAttr.__init__ (ArgSpec(args=['self', 'name', 'initializer', 'learning_rate', 'regularizer', 'trainable', 'gradient_clip', 'do_model_average'], varargs=None, keywords=None, defaults=(None, None, 1.0, None, True, None, True)), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) paddle.fluid.WeightNormParamAttr ('paddle.fluid.param_attr.WeightNormParamAttr', ('document', 'ea029ec9e0dea75f136211c433154f25')) paddle.fluid.WeightNormParamAttr.__init__ (ArgSpec(args=['self', 'dim', 'name', 'initializer', 'learning_rate', 'regularizer', 'trainable', 'gradient_clip', 'do_model_average'], varargs=None, keywords=None, defaults=(None, None, None, 1.0, None, True, None, False)), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) -paddle.fluid.DataFeeder ('paddle.fluid.data_feeder.DataFeeder', ('document', 'd9e64be617bd5f49dbb08ac2bc8665e6')) +paddle.fluid.DataFeeder ('paddle.fluid.data_feeder.DataFeeder', ('document', 'ac4eff522fe9929d1d74099cd9ad813e')) paddle.fluid.DataFeeder.__init__ (ArgSpec(args=['self', 'feed_list', 'place', 'program'], varargs=None, keywords=None, defaults=(None,)), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) -paddle.fluid.DataFeeder.decorate_reader (ArgSpec(args=['self', 'reader', 'multi_devices', 'num_places', 'drop_last'], varargs=None, keywords=None, defaults=(None, True)), ('document', 'a0ed5ce816b5d603cb595aacb922335a')) -paddle.fluid.DataFeeder.feed (ArgSpec(args=['self', 'iterable'], varargs=None, keywords=None, defaults=None), ('document', 'ce65fe1d81dcd7067d5092a5667f35cc')) -paddle.fluid.DataFeeder.feed_parallel (ArgSpec(args=['self', 'iterable', 'num_places'], varargs=None, keywords=None, defaults=(None,)), ('document', '334c6af750941a4397a2dd2ea8a4d76f')) +paddle.fluid.DataFeeder.decorate_reader (ArgSpec(args=['self', 'reader', 'multi_devices', 'num_places', 'drop_last'], varargs=None, keywords=None, defaults=(None, True)), ('document', '7b3363c6ee7565881261e9f4ba7442c9')) +paddle.fluid.DataFeeder.feed (ArgSpec(args=['self', 'iterable'], varargs=None, keywords=None, defaults=None), ('document', '4db18611f5f865c63c633a5edab71992')) +paddle.fluid.DataFeeder.feed_parallel (ArgSpec(args=['self', 'iterable', 'num_places'], varargs=None, keywords=None, defaults=(None,)), ('document', 'b1ecc829c904d4be2e07e2895ef376a6')) paddle.fluid.clip.set_gradient_clip (ArgSpec(args=['clip', 'param_list', 'program'], varargs=None, keywords=None, defaults=(None, None)), ('document', '7a0f76a77dd88a74f24485a103a22fc1')) paddle.fluid.clip.ErrorClipByValue ('paddle.fluid.clip.ErrorClipByValue', ('document', '629b07558971a8ab5e954d9a77457656')) paddle.fluid.clip.ErrorClipByValue.__init__ (ArgSpec(args=['self', 'max', 'min'], varargs=None, keywords=None, defaults=(None,)), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) diff --git a/python/paddle/fluid/data_feeder.py b/python/paddle/fluid/data_feeder.py index b377b32636c..496f7a8ada5 100644 --- a/python/paddle/fluid/data_feeder.py +++ b/python/paddle/fluid/data_feeder.py @@ -152,82 +152,59 @@ class BatchedTensorProvider(object): class DataFeeder(object): """ DataFeeder converts the data that returned by a reader into a data - structure that can feed into Executor and ParallelExecutor. The reader - usually returns a list of mini-batch data entries. Each data entry in - the list is one sample. Each sample is a list or a tuple with one - feature or multiple features. - - The simple usage shows below: - - .. code-block:: python - - import paddle.fluid as fluid - place = fluid.CPUPlace() - img = fluid.layers.data(name='image', shape=[1, 28, 28]) - label = fluid.layers.data(name='label', shape=[1], dtype='int64') - feeder = fluid.DataFeeder([img, label], fluid.CPUPlace()) - result = feeder.feed([([0] * 784, [9]), ([1] * 784, [1])]) - - - If you want to feed data into GPU side separately in advance when you - use multi-GPU to train a model, you can use `decorate_reader` function. - - .. code-block:: python - - import paddle - import paddle.fluid as fluid - - place=fluid.CUDAPlace(0) - data = fluid.layers.data(name='data', shape=[3, 224, 224], dtype='float32') - label = fluid.layers.data(name='label', shape=[1], dtype='int64') - - feeder = fluid.DataFeeder(place=place, feed_list=[data, label]) - reader = feeder.decorate_reader( - paddle.batch(paddle.dataset.flowers.train(), batch_size=16), multi_devices=True) - - Args: - feed_list(list): The Variables or Variables'name that will - feed into model. - place(Place): place indicates feed data into CPU or GPU, if you want to - feed data into GPU, please using `fluid.CUDAPlace(i)` (`i` represents - the GPU id), or if you want to feed data into CPU, please using - `fluid.CPUPlace()`. - program(Program): The Program that will feed data into, if program - is None, it will use default_main_program(). Default None. + structure that can feed into Executor. The reader is usually a + python generator that returns a list of mini-batch data entries. + + Parameters: + feed_list (list): Variables or names of Variables that need + to feed. + place (:ref:`api_fluid_CPUPlace` | :ref:`api_fluid_CUDAPlace` ): + place indicates the device (CPU | GPU) the data will be fed into, if + you want to feed data into GPU, please using :code:`fluid.CUDAPlace(i)` + (:code:`i` represents the GPU id), or if you want to feed data into CPU, + please using :code:`fluid.CPUPlace()`. + program (:ref:`api_fluid_Program` , optional): The Program that will + feed data into, if program is None, it will use default_main_program(). + Default None. Raises: - ValueError: If some Variable is not in this Program. + :code:`ValueError` - If some Variables are not in this Program. - Examples: + Example: .. code-block:: python - import numpy as np import paddle import paddle.fluid as fluid place = fluid.CPUPlace() - def reader(): - yield [np.random.random([4]).astype('float32'), np.random.random([3]).astype('float32')], + for _ in range(4): + yield np.random.random([4]).astype('float32'), np.random.random([3]).astype('float32'), main_program = fluid.Program() startup_program = fluid.Program() with fluid.program_guard(main_program, startup_program): - data_1 = fluid.layers.data(name='data_1', shape=[1, 2, 2]) - data_2 = fluid.layers.data(name='data_2', shape=[1, 1, 3]) + data_1 = fluid.data(name='data_1', shape=[None, 2, 2], dtype='float32') + data_2 = fluid.data(name='data_2', shape=[None, 1, 3], dtype='float32') out = fluid.layers.fc(input=[data_1, data_2], size=2) # ... - feeder = fluid.DataFeeder([data_1, data_2], place) - + exe = fluid.Executor(place) exe.run(startup_program) - for data in reader(): - outs = exe.run(program=main_program, - feed=feeder.feed(data), - fetch_list=[out]) + + feed_data = feeder.feed(reader()) + + # print feed_data to view feed results + # print(feed_data['data_1']) + # print(feed_data['data_2']) + + outs = exe.run(program=main_program, + feed=feed_data, + fetch_list=[out]) + print(outs) """ @@ -252,31 +229,41 @@ class DataFeeder(object): def feed(self, iterable): """ - According to feed_list and iterable, converters the input into - a data structure that can feed into Executor and ParallelExecutor. + According to :code:`feed_list` of :code:`DataFeeder` and :code:`iterable` , converts + the input into a data structure that can feed into Executor. - Args: - iterable(list|tuple): the input data. + Parameters: + iterable (generator): user defined python generator to read the raw input data - Returns: - dict: the result of conversion. + Returns: + :code:`dict`: a :code:`dict` that contains (variable name - converted tensor) pairs - Examples: + Example: .. code-block:: python - import numpy.random as random + # In this example, reader - generator will return a list of ndarray of 3 elements + # feed API will convert each ndarray input into a tensor + # the return result is a dict with keys: data_1, data_2, data_3 + # result['data_1'] a LoD-Tensor with shape of [5, 2, 1, 3]. 5 is batch size, and [2, 1, 3] is the real shape of data_1. + # result['data_2'], result['data_3'] are similar. + import numpy as np import paddle.fluid as fluid def reader(limit=5): - for i in range(limit): - yield random.random([784]).astype('float32'), random.random([1]).astype('int64'), random.random([256]).astype('float32') + for i in range(1, limit + 1): + yield np.ones([6]).astype('float32') * i , np.ones([1]).astype('int64') * i, np.random.random([9]).astype('float32') - data_1 = fluid.layers.data(name='data_1', shape=[1, 28, 28]) - data_2 = fluid.layers.data(name='data_2', shape=[1], dtype='int64') - data_3 = fluid.layers.data(name='data_3', shape=[16, 16], dtype='float32') + data_1 = fluid.data(name='data_1', shape=[None, 2, 1, 3]) + data_2 = fluid.data(name='data_2', shape=[None, 1], dtype='int64') + data_3 = fluid.data(name='data_3', shape=[None, 3, 3], dtype='float32') feeder = fluid.DataFeeder(['data_1','data_2', 'data_3'], fluid.CPUPlace()) - result = feeder.feed(reader()) + + result = feeder.feed(reader()) + print(result['data_1']) + print(result['data_2']) + print(result['data_3']) + """ converter = [] for lod_level, shape, dtype in six.moves.zip( @@ -303,46 +290,57 @@ class DataFeeder(object): def feed_parallel(self, iterable, num_places=None): """ - Takes multiple mini-batches. Each mini-batch will be feed on each - device in advance. + Similar with feed function, feed_parallel is used with multiple devices (CPU|GPU). + Here :code:`iterable` is a list of python generators. The data return by each + generator in the list will be fed into a seperate device. - Args: - iterable(list|tuple): the input data. - num_places(int): the number of devices. Default None. + Parameters: + iterable (list|tuple): list of user-defined python geneators. The element + number should match the :code:`num_places`. + num_places (int, optional): the number of devices. If not provided (None), + all available devices on the machine will be used. Default None. - Returns: - dict: the result of conversion. + Returns: + :code:`generator`: a :code:`generator` that generate dict which contains (variable name - converted tensor) pairs, + the total number of dicts will be generated matches with the :code:`num_places` - Notes: - The number of devices and number of mini-batches must be same. + .. note:: + The number of devices - :code:`num_places` should equal to the generator (element of :code:`iterable` ) number - Examples: + Example: .. code-block:: python - import numpy.random as random + import numpy as np import paddle.fluid as fluid - - def reader(limit=10): - for i in range(limit): - yield [random.random([784]).astype('float32'), random.random([1]).astype('float32')], - - x = fluid.layers.data(name='x', shape=[1, 28, 28]) - y = fluid.layers.data(name='y', shape=[1], dtype='float32') - - fluid.layers.elementwise_add(x, y) - + + def generate_reader(batch_size, base=0, factor=1): + def _reader(): + for i in range(batch_size): + yield np.ones([4]) * factor + base, np.ones([4]) * factor + base + 5 + return _reader() + + x = fluid.data(name='x', shape=[None, 2, 2]) + y = fluid.data(name='y', shape=[None, 2, 2], dtype='float32') + + z = fluid.layers.elementwise_add(x, y) + feeder = fluid.DataFeeder(['x','y'], fluid.CPUPlace()) - place_num = 2 + place_num = 2 places = [fluid.CPUPlace() for x in range(place_num)] data = [] exe = fluid.Executor(fluid.CPUPlace()) exe.run(fluid.default_startup_program()) program = fluid.CompiledProgram(fluid.default_main_program()).with_data_parallel(places=places) - for item in reader(): - data.append(item) - if place_num == len(data): - exe.run(program=program, feed=list(feeder.feed_parallel(data, place_num)), fetch_list=[]) - data = [] + + # print sample feed_parallel r resultt + # for item in list(feeder.feed_parallel([generate_reader(5, 0, 1), generate_reader(3, 10, 2)], 2)): + # print(item['x']) + # print(item['y']) + + reader_list = [generate_reader(5, 0, 1), generate_reader(3, 10, 2)] + res = exe.run(program=program, feed=list(feeder.feed_parallel(reader_list, 2)), fetch_list=[z]) + print(res) + """ if isinstance(self.place, core.CUDAPlace): places = [ @@ -383,52 +381,64 @@ class DataFeeder(object): num_places=None, drop_last=True): """ - Converter the input data into a data that returned by reader into - multiple mini-batches. Each mini-batch will be feed on each device. - - Args: - reader(function): the reader is the function which can generate data. - multi_devices(bool): whether to use multiple devices or not. - num_places(int): if multi_devices is True, you can specify the number - of GPU to use, if multi_devices is None, the function will use all the - GPU of the current machine. Default None. - drop_last(bool): whether to drop the last batch if the - size of the last batch is less than batch_size. Default True. - - Returns: - dict: the result of conversion. - + Decorate the reader (generator) to fit multiple devices. The reader generate + multiple mini-batches. Each mini-batch will be fed into a single device. + + Parameters: + reader(generator): a user defined python generator used to get :code:`mini-batch` of data. + A :code:`mini-batch` can be regarded as a python generator that returns batchs of input + entities, just like the below :code:`_mini_batch` in the code example. + multi_devices(bool): indicate whether to use multiple devices or not. + num_places(int, optional): if :code:`multi_devices` is True, you can specify the number + of devices(CPU|GPU) to use, if multi_devices is None, the function will use all the + devices of the current machine. Default None. + drop_last(bool, optional): whether to drop the last round of data if it is not enough to + feed all devices. Default True. + + Returns: + :code:`generator`: a new :code:`generator` which return converted dicts that can be fed into Executor + Raises: - ValueError: If drop_last is False and the data batch cannot fit for devices. + :code:`ValueError`: If drop_last is False and the data cannot fit devices perfectly. - Examples: + Example: .. code-block:: python - import numpy.random as random + import numpy as np import paddle import paddle.fluid as fluid import paddle.fluid.compiler as compiler - def reader(limit=10): - for i in range(limit): - yield (random.random([784]).astype('float32'), random.random([1]).astype('int64')), + def reader(): + def _mini_batch(batch_size): + for i in range(batch_size): + yield np.random.random([16]).astype('float32'), np.random.randint(10, size=[1]) + + for _ in range(10): + yield _mini_batch(np.random.randint(1, 10)) - place=fluid.CUDAPlace(0) - data = fluid.layers.data(name='data', shape=[1, 28, 28], dtype='float32') - label = fluid.layers.data(name='label', shape=[1], dtype='int64') + place_num = 3 + places = [fluid.CPUPlace() for _ in range(place_num)] + # a simple network sample + data = fluid.data(name='data', shape=[None, 4, 4], dtype='float32') + label = fluid.data(name='label', shape=[None, 1], dtype='int64') hidden = fluid.layers.fc(input=data, size=10) - feeder = fluid.DataFeeder(place=place, feed_list=[data, label]) - reader = feeder.decorate_reader(reader, multi_devices=True) + feeder = fluid.DataFeeder(place=places[0], feed_list=[data, label]) + reader = feeder.decorate_reader(reader, multi_devices=True, num_places=3, drop_last=True) - exe = fluid.Executor(place) + exe = fluid.Executor(places[0]) exe.run(fluid.default_startup_program()) compiled_prog = compiler.CompiledProgram( - fluid.default_main_program()).with_data_parallel() + fluid.default_main_program()).with_data_parallel(places=places) + for i,data in enumerate(reader()): - print('iteration : ', i + 1) + # print data if you like + # print(i, data) ret = exe.run(compiled_prog, feed=data, fetch_list=[hidden]) + print(ret) + """ def __reader_creator__(): diff --git a/python/paddle/fluid/framework.py b/python/paddle/fluid/framework.py index 52a8dc61095..939f01cb28a 100644 --- a/python/paddle/fluid/framework.py +++ b/python/paddle/fluid/framework.py @@ -4426,27 +4426,29 @@ def default_startup_program(): def default_main_program(): """ - Get default/global main program. The main program is used for training or - testing. - - All layer function in :code:`fluid.layers` will append operators and - variables to the :code:`default_main_program`. + This API can be used to get ``default main program`` which store the + descriptions of ``op`` and ``variable``. + + For example ``z = fluid.layers.elementwise_add(x, y)`` will create a new ``elementwise_add`` + ``op`` and a new ``z`` ``variable``, and they will be recorded in ``default main program`` - The :code:`default_main_program` is the default program in a lot of APIs. - For example, the :code:`Executor.run()` will execute the + The ``default_main_program`` is the default value for ``Program`` parameter in + a lot of ``fluid`` APIs. For example, the :code:`Executor.run()` will execute the :code:`default_main_program` when the program is not specified. + If you want to replace the ``default main program``, you can use :ref:`api_fluid_program_guard` + Returns: - Program: main program + :ref:`api_fluid_Program`: a ``Program`` which holding the descriptions of ops and variables in the network. Examples: .. code-block:: python import paddle.fluid as fluid - + # Sample Network: - data = fluid.layers.data(name='image', shape=[3, 224, 224], dtype='float32') - label = fluid.layers.data(name='label', shape=[1], dtype='int64') + data = fluid.data(name='image', shape=[None, 3, 224, 224], dtype='float32') + label = fluid.data(name='label', shape=[None, 1], dtype='int64') conv1 = fluid.layers.conv2d(data, 4, 5, 1, act=None) bn1 = fluid.layers.batch_norm(conv1, act='relu') @@ -4466,8 +4468,12 @@ def default_main_program(): regularization=fluid.regularizer.L2Decay(1e-4)) opt.minimize(loss) + #print the number of blocks in the program, 1 in this case print(fluid.default_main_program().num_blocks) + + #print the description of variable 'image' print(fluid.default_main_program().blocks[0].var('image')) + """ return _main_program_ diff --git a/python/paddle/reader/decorator.py b/python/paddle/reader/decorator.py index 8afbb2a5440..d40585620f8 100644 --- a/python/paddle/reader/decorator.py +++ b/python/paddle/reader/decorator.py @@ -511,26 +511,84 @@ def xmap_readers(mapper, reader, process_num, buffer_size, order=False): def multiprocess_reader(readers, use_pipe=True, queue_size=1000): """ - multiprocess_reader use python multi process to read data from readers - and then use multiprocess.Queue or multiprocess.Pipe to merge all - data. The process number is equal to the number of input readers, each - process call one reader. + This API use python ``multiprocessing`` to read data from ``readers`` parallelly, + and then ``multiprocess.Queue`` or ``multiprocess.Pipe`` is used to merge + these data. A seperate process will be created for each reader in the + ``readers`` list, please guarantee every reader can work independently + to avoid conflicts in parallel environment. + + + ``Multiprocess.Queue`` require the rw access right to /dev/shm, and it's not suppported + in some platforms. - Multiprocess.Queue require the rw access right to /dev/shm, some - platform does not support. + Parameters: + readers (list( ``generator`` ) | tuple( ``generator`` )): a python ``generator`` list + used to read input data + use_pipe (bool, optional): control the inner API used to implement the multi-processing, + default True - use ``multiprocess.Pipe`` which is recommended + queue_size (int, optional): only useful when ``use_pipe`` is False - ``multiprocess.Queue`` + is used, default 1000. Increase this value can speed up the data reading, and more memory + will be consumed. - you need to create multiple readers first, these readers should be independent - to each other so that each process can work independently. + Returns: + ``generator``: a new reader which can be run parallelly - An example: + + Example: .. code-block:: python - reader0 = reader(["file01", "file02"]) - reader1 = reader(["file11", "file12"]) - reader1 = reader(["file21", "file22"]) - reader = multiprocess_reader([reader0, reader1, reader2], - queue_size=100, use_pipe=False) + import paddle.fluid as fluid + from paddle.fluid.io import multiprocess_reader + import numpy as np + + sample_files = ['sample_file_1', 'sample_file_2'] + + def fake_input_files(): + with open(sample_files[0], 'w') as f: + np.savez(f, a=np.array([1, 2]), b=np.array([3, 4]), c=np.array([5, 6]), d=np.array([7, 8])) + with open(sample_files[1], 'w') as f: + np.savez(f, a=np.array([9, 10]), b=np.array([11, 12]), c=np.array([13, 14])) + + + def generate_reader(file_name): + # load data file + def _impl(): + data = np.load(file_name) + for item in sorted(data.files): + yield data[item], + return _impl + + if __name__ == '__main__': + # generate sample input files + fake_input_files() + + with fluid.program_guard(fluid.Program(), fluid.Program()): + place = fluid.CPUPlace() + # the 1st 2 is batch size + image = fluid.data(name='image', dtype='int64', shape=[2, 1, 2]) + fluid.layers.Print(image) + # print detailed tensor info of image variable + + reader = fluid.io.PyReader(feed_list=[image], capacity=2) + + decorated_reader = multiprocess_reader( + [generate_reader(sample_files[0]), generate_reader(sample_files[1])], False) + + reader.decorate_sample_generator(decorated_reader, batch_size=2, places=[place]) + + exe = fluid.Executor(place) + exe.run(fluid.default_startup_program()) + + for data in reader(): + res = exe.run(feed=data, fetch_list=[image]) + print(res[0]) + # print below content in this case + # [[[1 2]], [[3 4]]] + # [[[5 6]], [[7 8]]] + # [[[9 10]], [[11 12]]] + # [13,14] will be dropped + """ try: -- GitLab