未验证 提交 33318acd 编写于 作者: L liuwei1031 提交者: GitHub

Cherry pick (#20515) (#20536) (#20568) (#20573)

* improve the doc of data feeder related APIs (#20515)

* improve data feeder related API

* fix doc of default_main_program, multiprocess_reader (#20536)

* fix doc of default_main_program, multiprocess_reader

* update API.spec

* fix comment

* update data feeder API sample, change fluid.layers.data to fluid.data (#20568)

* update data feeder API sample, fluid.layers.data => fluid.data

* update API.spec

* update API.spec
上级 c40f198a
...@@ -8,7 +8,7 @@ paddle.fluid.Program.list_vars (ArgSpec(args=['self'], varargs=None, keywords=No ...@@ -8,7 +8,7 @@ paddle.fluid.Program.list_vars (ArgSpec(args=['self'], varargs=None, keywords=No
paddle.fluid.Program.parse_from_string (ArgSpec(args=['binary_str'], varargs=None, keywords=None, defaults=None), ('document', 'fc4a5660ff4280278402688f0014ce7f')) paddle.fluid.Program.parse_from_string (ArgSpec(args=['binary_str'], varargs=None, keywords=None, defaults=None), ('document', 'fc4a5660ff4280278402688f0014ce7f'))
paddle.fluid.Program.to_string (ArgSpec(args=['self', 'throw_on_error', 'with_details'], varargs=None, keywords=None, defaults=(False,)), ('document', '7dde33f16b63aa50d474870a9cebb539')) paddle.fluid.Program.to_string (ArgSpec(args=['self', 'throw_on_error', 'with_details'], varargs=None, keywords=None, defaults=(False,)), ('document', '7dde33f16b63aa50d474870a9cebb539'))
paddle.fluid.default_startup_program (ArgSpec(args=[], varargs=None, keywords=None, defaults=None), ('document', 'f53890b2fb8c0642b6047e4fee2d6d58')) paddle.fluid.default_startup_program (ArgSpec(args=[], varargs=None, keywords=None, defaults=None), ('document', 'f53890b2fb8c0642b6047e4fee2d6d58'))
paddle.fluid.default_main_program (ArgSpec(args=[], varargs=None, keywords=None, defaults=None), ('document', '853718df675e59aea7104f3d61bbf11d')) paddle.fluid.default_main_program (ArgSpec(args=[], varargs=None, keywords=None, defaults=None), ('document', '082aa471d247bd8d7c87814105439e1a'))
paddle.fluid.program_guard (ArgSpec(args=['main_program', 'startup_program'], varargs=None, keywords=None, defaults=(None,)), ('document', '78fb5c7f70ef76bcf4a1862c3f6b8191')) paddle.fluid.program_guard (ArgSpec(args=['main_program', 'startup_program'], varargs=None, keywords=None, defaults=(None,)), ('document', '78fb5c7f70ef76bcf4a1862c3f6b8191'))
paddle.fluid.name_scope (ArgSpec(args=['prefix'], varargs=None, keywords=None, defaults=(None,)), ('document', '907a5f877206079d8e67ae69b06bb3ba')) paddle.fluid.name_scope (ArgSpec(args=['prefix'], varargs=None, keywords=None, defaults=(None,)), ('document', '907a5f877206079d8e67ae69b06bb3ba'))
paddle.fluid.cuda_places (ArgSpec(args=['device_ids'], varargs=None, keywords=None, defaults=(None,)), ('document', 'ab9bd2079536114aa7c1488a489ee87f')) paddle.fluid.cuda_places (ArgSpec(args=['device_ids'], varargs=None, keywords=None, defaults=(None,)), ('document', 'ab9bd2079536114aa7c1488a489ee87f'))
...@@ -101,7 +101,7 @@ paddle.fluid.io.chain (ArgSpec(args=[], varargs='readers', keywords=None, defaul ...@@ -101,7 +101,7 @@ paddle.fluid.io.chain (ArgSpec(args=[], varargs='readers', keywords=None, defaul
paddle.fluid.io.shuffle (ArgSpec(args=['reader', 'buf_size'], varargs=None, keywords=None, defaults=None), ('document', '961d0a950cc837c8b13577301dee7bd8')) paddle.fluid.io.shuffle (ArgSpec(args=['reader', 'buf_size'], varargs=None, keywords=None, defaults=None), ('document', '961d0a950cc837c8b13577301dee7bd8'))
paddle.fluid.io.firstn (ArgSpec(args=['reader', 'n'], varargs=None, keywords=None, defaults=None), ('document', 'db83c761a5530a05c1ffe2f6f78198f4')) paddle.fluid.io.firstn (ArgSpec(args=['reader', 'n'], varargs=None, keywords=None, defaults=None), ('document', 'db83c761a5530a05c1ffe2f6f78198f4'))
paddle.fluid.io.xmap_readers (ArgSpec(args=['mapper', 'reader', 'process_num', 'buffer_size', 'order'], varargs=None, keywords=None, defaults=(False,)), ('document', '17a1d4e59c4260a9416ff269c5e347a3')) paddle.fluid.io.xmap_readers (ArgSpec(args=['mapper', 'reader', 'process_num', 'buffer_size', 'order'], varargs=None, keywords=None, defaults=(False,)), ('document', '17a1d4e59c4260a9416ff269c5e347a3'))
paddle.fluid.io.multiprocess_reader (ArgSpec(args=['readers', 'use_pipe', 'queue_size'], varargs=None, keywords=None, defaults=(True, 1000)), ('document', '7d8b3a96e592107c893d5d51ce968ba0')) paddle.fluid.io.multiprocess_reader (ArgSpec(args=['readers', 'use_pipe', 'queue_size'], varargs=None, keywords=None, defaults=(True, 1000)), ('document', '1749709ed7aeb08c1269f58d92ea13e0'))
paddle.fluid.initializer.ConstantInitializer ('paddle.fluid.initializer.ConstantInitializer', ('document', '911263fc30c516c55e89cd72086a23f8')) paddle.fluid.initializer.ConstantInitializer ('paddle.fluid.initializer.ConstantInitializer', ('document', '911263fc30c516c55e89cd72086a23f8'))
paddle.fluid.initializer.ConstantInitializer.__init__ (ArgSpec(args=['self', 'value', 'force_cpu'], varargs=None, keywords=None, defaults=(0.0, False)), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) paddle.fluid.initializer.ConstantInitializer.__init__ (ArgSpec(args=['self', 'value', 'force_cpu'], varargs=None, keywords=None, defaults=(0.0, False)), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
paddle.fluid.initializer.UniformInitializer ('paddle.fluid.initializer.UniformInitializer', ('document', '264e7794745ec36cf826a6f243027db7')) paddle.fluid.initializer.UniformInitializer ('paddle.fluid.initializer.UniformInitializer', ('document', '264e7794745ec36cf826a6f243027db7'))
...@@ -1100,11 +1100,11 @@ paddle.fluid.ParamAttr ('paddle.fluid.param_attr.ParamAttr', ('document', '7b5bf ...@@ -1100,11 +1100,11 @@ paddle.fluid.ParamAttr ('paddle.fluid.param_attr.ParamAttr', ('document', '7b5bf
paddle.fluid.ParamAttr.__init__ (ArgSpec(args=['self', 'name', 'initializer', 'learning_rate', 'regularizer', 'trainable', 'gradient_clip', 'do_model_average'], varargs=None, keywords=None, defaults=(None, None, 1.0, None, True, None, True)), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) paddle.fluid.ParamAttr.__init__ (ArgSpec(args=['self', 'name', 'initializer', 'learning_rate', 'regularizer', 'trainable', 'gradient_clip', 'do_model_average'], varargs=None, keywords=None, defaults=(None, None, 1.0, None, True, None, True)), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
paddle.fluid.WeightNormParamAttr ('paddle.fluid.param_attr.WeightNormParamAttr', ('document', 'ea029ec9e0dea75f136211c433154f25')) paddle.fluid.WeightNormParamAttr ('paddle.fluid.param_attr.WeightNormParamAttr', ('document', 'ea029ec9e0dea75f136211c433154f25'))
paddle.fluid.WeightNormParamAttr.__init__ (ArgSpec(args=['self', 'dim', 'name', 'initializer', 'learning_rate', 'regularizer', 'trainable', 'gradient_clip', 'do_model_average'], varargs=None, keywords=None, defaults=(None, None, None, 1.0, None, True, None, False)), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) paddle.fluid.WeightNormParamAttr.__init__ (ArgSpec(args=['self', 'dim', 'name', 'initializer', 'learning_rate', 'regularizer', 'trainable', 'gradient_clip', 'do_model_average'], varargs=None, keywords=None, defaults=(None, None, None, 1.0, None, True, None, False)), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
paddle.fluid.DataFeeder ('paddle.fluid.data_feeder.DataFeeder', ('document', 'd9e64be617bd5f49dbb08ac2bc8665e6')) paddle.fluid.DataFeeder ('paddle.fluid.data_feeder.DataFeeder', ('document', 'ac4eff522fe9929d1d74099cd9ad813e'))
paddle.fluid.DataFeeder.__init__ (ArgSpec(args=['self', 'feed_list', 'place', 'program'], varargs=None, keywords=None, defaults=(None,)), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) paddle.fluid.DataFeeder.__init__ (ArgSpec(args=['self', 'feed_list', 'place', 'program'], varargs=None, keywords=None, defaults=(None,)), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
paddle.fluid.DataFeeder.decorate_reader (ArgSpec(args=['self', 'reader', 'multi_devices', 'num_places', 'drop_last'], varargs=None, keywords=None, defaults=(None, True)), ('document', 'a0ed5ce816b5d603cb595aacb922335a')) paddle.fluid.DataFeeder.decorate_reader (ArgSpec(args=['self', 'reader', 'multi_devices', 'num_places', 'drop_last'], varargs=None, keywords=None, defaults=(None, True)), ('document', '7b3363c6ee7565881261e9f4ba7442c9'))
paddle.fluid.DataFeeder.feed (ArgSpec(args=['self', 'iterable'], varargs=None, keywords=None, defaults=None), ('document', 'ce65fe1d81dcd7067d5092a5667f35cc')) paddle.fluid.DataFeeder.feed (ArgSpec(args=['self', 'iterable'], varargs=None, keywords=None, defaults=None), ('document', '4db18611f5f865c63c633a5edab71992'))
paddle.fluid.DataFeeder.feed_parallel (ArgSpec(args=['self', 'iterable', 'num_places'], varargs=None, keywords=None, defaults=(None,)), ('document', '334c6af750941a4397a2dd2ea8a4d76f')) paddle.fluid.DataFeeder.feed_parallel (ArgSpec(args=['self', 'iterable', 'num_places'], varargs=None, keywords=None, defaults=(None,)), ('document', 'b1ecc829c904d4be2e07e2895ef376a6'))
paddle.fluid.clip.set_gradient_clip (ArgSpec(args=['clip', 'param_list', 'program'], varargs=None, keywords=None, defaults=(None, None)), ('document', '7a0f76a77dd88a74f24485a103a22fc1')) paddle.fluid.clip.set_gradient_clip (ArgSpec(args=['clip', 'param_list', 'program'], varargs=None, keywords=None, defaults=(None, None)), ('document', '7a0f76a77dd88a74f24485a103a22fc1'))
paddle.fluid.clip.ErrorClipByValue ('paddle.fluid.clip.ErrorClipByValue', ('document', '629b07558971a8ab5e954d9a77457656')) paddle.fluid.clip.ErrorClipByValue ('paddle.fluid.clip.ErrorClipByValue', ('document', '629b07558971a8ab5e954d9a77457656'))
paddle.fluid.clip.ErrorClipByValue.__init__ (ArgSpec(args=['self', 'max', 'min'], varargs=None, keywords=None, defaults=(None,)), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) paddle.fluid.clip.ErrorClipByValue.__init__ (ArgSpec(args=['self', 'max', 'min'], varargs=None, keywords=None, defaults=(None,)), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
......
...@@ -152,82 +152,59 @@ class BatchedTensorProvider(object): ...@@ -152,82 +152,59 @@ class BatchedTensorProvider(object):
class DataFeeder(object): class DataFeeder(object):
""" """
DataFeeder converts the data that returned by a reader into a data DataFeeder converts the data that returned by a reader into a data
structure that can feed into Executor and ParallelExecutor. The reader structure that can feed into Executor. The reader is usually a
usually returns a list of mini-batch data entries. Each data entry in python generator that returns a list of mini-batch data entries.
the list is one sample. Each sample is a list or a tuple with one
feature or multiple features. Parameters:
feed_list (list): Variables or names of Variables that need
The simple usage shows below: to feed.
place (:ref:`api_fluid_CPUPlace` | :ref:`api_fluid_CUDAPlace` ):
.. code-block:: python place indicates the device (CPU | GPU) the data will be fed into, if
you want to feed data into GPU, please using :code:`fluid.CUDAPlace(i)`
import paddle.fluid as fluid (:code:`i` represents the GPU id), or if you want to feed data into CPU,
place = fluid.CPUPlace() please using :code:`fluid.CPUPlace()`.
img = fluid.layers.data(name='image', shape=[1, 28, 28]) program (:ref:`api_fluid_Program` , optional): The Program that will
label = fluid.layers.data(name='label', shape=[1], dtype='int64') feed data into, if program is None, it will use default_main_program().
feeder = fluid.DataFeeder([img, label], fluid.CPUPlace()) Default None.
result = feeder.feed([([0] * 784, [9]), ([1] * 784, [1])])
If you want to feed data into GPU side separately in advance when you
use multi-GPU to train a model, you can use `decorate_reader` function.
.. code-block:: python
import paddle
import paddle.fluid as fluid
place=fluid.CUDAPlace(0)
data = fluid.layers.data(name='data', shape=[3, 224, 224], dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
feeder = fluid.DataFeeder(place=place, feed_list=[data, label])
reader = feeder.decorate_reader(
paddle.batch(paddle.dataset.flowers.train(), batch_size=16), multi_devices=True)
Args:
feed_list(list): The Variables or Variables'name that will
feed into model.
place(Place): place indicates feed data into CPU or GPU, if you want to
feed data into GPU, please using `fluid.CUDAPlace(i)` (`i` represents
the GPU id), or if you want to feed data into CPU, please using
`fluid.CPUPlace()`.
program(Program): The Program that will feed data into, if program
is None, it will use default_main_program(). Default None.
Raises: Raises:
ValueError: If some Variable is not in this Program. :code:`ValueError` - If some Variables are not in this Program.
Examples: Example:
.. code-block:: python .. code-block:: python
import numpy as np import numpy as np
import paddle import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
place = fluid.CPUPlace() place = fluid.CPUPlace()
def reader(): def reader():
yield [np.random.random([4]).astype('float32'), np.random.random([3]).astype('float32')], for _ in range(4):
yield np.random.random([4]).astype('float32'), np.random.random([3]).astype('float32'),
main_program = fluid.Program() main_program = fluid.Program()
startup_program = fluid.Program() startup_program = fluid.Program()
with fluid.program_guard(main_program, startup_program): with fluid.program_guard(main_program, startup_program):
data_1 = fluid.layers.data(name='data_1', shape=[1, 2, 2]) data_1 = fluid.data(name='data_1', shape=[None, 2, 2], dtype='float32')
data_2 = fluid.layers.data(name='data_2', shape=[1, 1, 3]) data_2 = fluid.data(name='data_2', shape=[None, 1, 3], dtype='float32')
out = fluid.layers.fc(input=[data_1, data_2], size=2) out = fluid.layers.fc(input=[data_1, data_2], size=2)
# ... # ...
feeder = fluid.DataFeeder([data_1, data_2], place) feeder = fluid.DataFeeder([data_1, data_2], place)
exe = fluid.Executor(place) exe = fluid.Executor(place)
exe.run(startup_program) exe.run(startup_program)
for data in reader():
outs = exe.run(program=main_program, feed_data = feeder.feed(reader())
feed=feeder.feed(data),
fetch_list=[out]) # print feed_data to view feed results
# print(feed_data['data_1'])
# print(feed_data['data_2'])
outs = exe.run(program=main_program,
feed=feed_data,
fetch_list=[out])
print(outs)
""" """
...@@ -252,31 +229,41 @@ class DataFeeder(object): ...@@ -252,31 +229,41 @@ class DataFeeder(object):
def feed(self, iterable): def feed(self, iterable):
""" """
According to feed_list and iterable, converters the input into According to :code:`feed_list` of :code:`DataFeeder` and :code:`iterable` , converts
a data structure that can feed into Executor and ParallelExecutor. the input into a data structure that can feed into Executor.
Args: Parameters:
iterable(list|tuple): the input data. iterable (generator): user defined python generator to read the raw input data
Returns: Returns:
dict: the result of conversion. :code:`dict`: a :code:`dict` that contains (variable name - converted tensor) pairs
Examples: Example:
.. code-block:: python .. code-block:: python
import numpy.random as random # In this example, reader - generator will return a list of ndarray of 3 elements
# feed API will convert each ndarray input into a tensor
# the return result is a dict with keys: data_1, data_2, data_3
# result['data_1'] a LoD-Tensor with shape of [5, 2, 1, 3]. 5 is batch size, and [2, 1, 3] is the real shape of data_1.
# result['data_2'], result['data_3'] are similar.
import numpy as np
import paddle.fluid as fluid import paddle.fluid as fluid
def reader(limit=5): def reader(limit=5):
for i in range(limit): for i in range(1, limit + 1):
yield random.random([784]).astype('float32'), random.random([1]).astype('int64'), random.random([256]).astype('float32') yield np.ones([6]).astype('float32') * i , np.ones([1]).astype('int64') * i, np.random.random([9]).astype('float32')
data_1 = fluid.layers.data(name='data_1', shape=[1, 28, 28]) data_1 = fluid.data(name='data_1', shape=[None, 2, 1, 3])
data_2 = fluid.layers.data(name='data_2', shape=[1], dtype='int64') data_2 = fluid.data(name='data_2', shape=[None, 1], dtype='int64')
data_3 = fluid.layers.data(name='data_3', shape=[16, 16], dtype='float32') data_3 = fluid.data(name='data_3', shape=[None, 3, 3], dtype='float32')
feeder = fluid.DataFeeder(['data_1','data_2', 'data_3'], fluid.CPUPlace()) feeder = fluid.DataFeeder(['data_1','data_2', 'data_3'], fluid.CPUPlace())
result = feeder.feed(reader())
result = feeder.feed(reader())
print(result['data_1'])
print(result['data_2'])
print(result['data_3'])
""" """
converter = [] converter = []
for lod_level, shape, dtype in six.moves.zip( for lod_level, shape, dtype in six.moves.zip(
...@@ -303,46 +290,57 @@ class DataFeeder(object): ...@@ -303,46 +290,57 @@ class DataFeeder(object):
def feed_parallel(self, iterable, num_places=None): def feed_parallel(self, iterable, num_places=None):
""" """
Takes multiple mini-batches. Each mini-batch will be feed on each Similar with feed function, feed_parallel is used with multiple devices (CPU|GPU).
device in advance. Here :code:`iterable` is a list of python generators. The data return by each
generator in the list will be fed into a seperate device.
Args: Parameters:
iterable(list|tuple): the input data. iterable (list|tuple): list of user-defined python geneators. The element
num_places(int): the number of devices. Default None. number should match the :code:`num_places`.
num_places (int, optional): the number of devices. If not provided (None),
all available devices on the machine will be used. Default None.
Returns: Returns:
dict: the result of conversion. :code:`generator`: a :code:`generator` that generate dict which contains (variable name - converted tensor) pairs,
the total number of dicts will be generated matches with the :code:`num_places`
Notes: .. note::
The number of devices and number of mini-batches must be same. The number of devices - :code:`num_places` should equal to the generator (element of :code:`iterable` ) number
Examples: Example:
.. code-block:: python .. code-block:: python
import numpy.random as random import numpy as np
import paddle.fluid as fluid import paddle.fluid as fluid
def reader(limit=10): def generate_reader(batch_size, base=0, factor=1):
for i in range(limit): def _reader():
yield [random.random([784]).astype('float32'), random.random([1]).astype('float32')], for i in range(batch_size):
yield np.ones([4]) * factor + base, np.ones([4]) * factor + base + 5
x = fluid.layers.data(name='x', shape=[1, 28, 28]) return _reader()
y = fluid.layers.data(name='y', shape=[1], dtype='float32')
x = fluid.data(name='x', shape=[None, 2, 2])
fluid.layers.elementwise_add(x, y) y = fluid.data(name='y', shape=[None, 2, 2], dtype='float32')
z = fluid.layers.elementwise_add(x, y)
feeder = fluid.DataFeeder(['x','y'], fluid.CPUPlace()) feeder = fluid.DataFeeder(['x','y'], fluid.CPUPlace())
place_num = 2 place_num = 2
places = [fluid.CPUPlace() for x in range(place_num)] places = [fluid.CPUPlace() for x in range(place_num)]
data = [] data = []
exe = fluid.Executor(fluid.CPUPlace()) exe = fluid.Executor(fluid.CPUPlace())
exe.run(fluid.default_startup_program()) exe.run(fluid.default_startup_program())
program = fluid.CompiledProgram(fluid.default_main_program()).with_data_parallel(places=places) program = fluid.CompiledProgram(fluid.default_main_program()).with_data_parallel(places=places)
for item in reader():
data.append(item) # print sample feed_parallel r resultt
if place_num == len(data): # for item in list(feeder.feed_parallel([generate_reader(5, 0, 1), generate_reader(3, 10, 2)], 2)):
exe.run(program=program, feed=list(feeder.feed_parallel(data, place_num)), fetch_list=[]) # print(item['x'])
data = [] # print(item['y'])
reader_list = [generate_reader(5, 0, 1), generate_reader(3, 10, 2)]
res = exe.run(program=program, feed=list(feeder.feed_parallel(reader_list, 2)), fetch_list=[z])
print(res)
""" """
if isinstance(self.place, core.CUDAPlace): if isinstance(self.place, core.CUDAPlace):
places = [ places = [
...@@ -383,52 +381,64 @@ class DataFeeder(object): ...@@ -383,52 +381,64 @@ class DataFeeder(object):
num_places=None, num_places=None,
drop_last=True): drop_last=True):
""" """
Converter the input data into a data that returned by reader into Decorate the reader (generator) to fit multiple devices. The reader generate
multiple mini-batches. Each mini-batch will be feed on each device. multiple mini-batches. Each mini-batch will be fed into a single device.
Args: Parameters:
reader(function): the reader is the function which can generate data. reader(generator): a user defined python generator used to get :code:`mini-batch` of data.
multi_devices(bool): whether to use multiple devices or not. A :code:`mini-batch` can be regarded as a python generator that returns batchs of input
num_places(int): if multi_devices is True, you can specify the number entities, just like the below :code:`_mini_batch` in the code example.
of GPU to use, if multi_devices is None, the function will use all the multi_devices(bool): indicate whether to use multiple devices or not.
GPU of the current machine. Default None. num_places(int, optional): if :code:`multi_devices` is True, you can specify the number
drop_last(bool): whether to drop the last batch if the of devices(CPU|GPU) to use, if multi_devices is None, the function will use all the
size of the last batch is less than batch_size. Default True. devices of the current machine. Default None.
drop_last(bool, optional): whether to drop the last round of data if it is not enough to
Returns: feed all devices. Default True.
dict: the result of conversion.
Returns:
:code:`generator`: a new :code:`generator` which return converted dicts that can be fed into Executor
Raises: Raises:
ValueError: If drop_last is False and the data batch cannot fit for devices. :code:`ValueError`: If drop_last is False and the data cannot fit devices perfectly.
Examples: Example:
.. code-block:: python .. code-block:: python
import numpy.random as random import numpy as np
import paddle import paddle
import paddle.fluid as fluid import paddle.fluid as fluid
import paddle.fluid.compiler as compiler import paddle.fluid.compiler as compiler
def reader(limit=10): def reader():
for i in range(limit): def _mini_batch(batch_size):
yield (random.random([784]).astype('float32'), random.random([1]).astype('int64')), for i in range(batch_size):
yield np.random.random([16]).astype('float32'), np.random.randint(10, size=[1])
for _ in range(10):
yield _mini_batch(np.random.randint(1, 10))
place=fluid.CUDAPlace(0) place_num = 3
data = fluid.layers.data(name='data', shape=[1, 28, 28], dtype='float32') places = [fluid.CPUPlace() for _ in range(place_num)]
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
# a simple network sample
data = fluid.data(name='data', shape=[None, 4, 4], dtype='float32')
label = fluid.data(name='label', shape=[None, 1], dtype='int64')
hidden = fluid.layers.fc(input=data, size=10) hidden = fluid.layers.fc(input=data, size=10)
feeder = fluid.DataFeeder(place=place, feed_list=[data, label]) feeder = fluid.DataFeeder(place=places[0], feed_list=[data, label])
reader = feeder.decorate_reader(reader, multi_devices=True) reader = feeder.decorate_reader(reader, multi_devices=True, num_places=3, drop_last=True)
exe = fluid.Executor(place) exe = fluid.Executor(places[0])
exe.run(fluid.default_startup_program()) exe.run(fluid.default_startup_program())
compiled_prog = compiler.CompiledProgram( compiled_prog = compiler.CompiledProgram(
fluid.default_main_program()).with_data_parallel() fluid.default_main_program()).with_data_parallel(places=places)
for i,data in enumerate(reader()): for i,data in enumerate(reader()):
print('iteration : ', i + 1) # print data if you like
# print(i, data)
ret = exe.run(compiled_prog, feed=data, fetch_list=[hidden]) ret = exe.run(compiled_prog, feed=data, fetch_list=[hidden])
print(ret)
""" """
def __reader_creator__(): def __reader_creator__():
......
...@@ -4426,27 +4426,29 @@ def default_startup_program(): ...@@ -4426,27 +4426,29 @@ def default_startup_program():
def default_main_program(): def default_main_program():
""" """
Get default/global main program. The main program is used for training or This API can be used to get ``default main program`` which store the
testing. descriptions of ``op`` and ``variable``.
All layer function in :code:`fluid.layers` will append operators and For example ``z = fluid.layers.elementwise_add(x, y)`` will create a new ``elementwise_add``
variables to the :code:`default_main_program`. ``op`` and a new ``z`` ``variable``, and they will be recorded in ``default main program``
The :code:`default_main_program` is the default program in a lot of APIs. The ``default_main_program`` is the default value for ``Program`` parameter in
For example, the :code:`Executor.run()` will execute the a lot of ``fluid`` APIs. For example, the :code:`Executor.run()` will execute the
:code:`default_main_program` when the program is not specified. :code:`default_main_program` when the program is not specified.
If you want to replace the ``default main program``, you can use :ref:`api_fluid_program_guard`
Returns: Returns:
Program: main program :ref:`api_fluid_Program`: a ``Program`` which holding the descriptions of ops and variables in the network.
Examples: Examples:
.. code-block:: python .. code-block:: python
import paddle.fluid as fluid import paddle.fluid as fluid
# Sample Network: # Sample Network:
data = fluid.layers.data(name='image', shape=[3, 224, 224], dtype='float32') data = fluid.data(name='image', shape=[None, 3, 224, 224], dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64') label = fluid.data(name='label', shape=[None, 1], dtype='int64')
conv1 = fluid.layers.conv2d(data, 4, 5, 1, act=None) conv1 = fluid.layers.conv2d(data, 4, 5, 1, act=None)
bn1 = fluid.layers.batch_norm(conv1, act='relu') bn1 = fluid.layers.batch_norm(conv1, act='relu')
...@@ -4466,8 +4468,12 @@ def default_main_program(): ...@@ -4466,8 +4468,12 @@ def default_main_program():
regularization=fluid.regularizer.L2Decay(1e-4)) regularization=fluid.regularizer.L2Decay(1e-4))
opt.minimize(loss) opt.minimize(loss)
#print the number of blocks in the program, 1 in this case
print(fluid.default_main_program().num_blocks) print(fluid.default_main_program().num_blocks)
#print the description of variable 'image'
print(fluid.default_main_program().blocks[0].var('image')) print(fluid.default_main_program().blocks[0].var('image'))
""" """
return _main_program_ return _main_program_
......
...@@ -511,26 +511,84 @@ def xmap_readers(mapper, reader, process_num, buffer_size, order=False): ...@@ -511,26 +511,84 @@ def xmap_readers(mapper, reader, process_num, buffer_size, order=False):
def multiprocess_reader(readers, use_pipe=True, queue_size=1000): def multiprocess_reader(readers, use_pipe=True, queue_size=1000):
""" """
multiprocess_reader use python multi process to read data from readers This API use python ``multiprocessing`` to read data from ``readers`` parallelly,
and then use multiprocess.Queue or multiprocess.Pipe to merge all and then ``multiprocess.Queue`` or ``multiprocess.Pipe`` is used to merge
data. The process number is equal to the number of input readers, each these data. A seperate process will be created for each reader in the
process call one reader. ``readers`` list, please guarantee every reader can work independently
to avoid conflicts in parallel environment.
``Multiprocess.Queue`` require the rw access right to /dev/shm, and it's not suppported
in some platforms.
Multiprocess.Queue require the rw access right to /dev/shm, some Parameters:
platform does not support. readers (list( ``generator`` ) | tuple( ``generator`` )): a python ``generator`` list
used to read input data
use_pipe (bool, optional): control the inner API used to implement the multi-processing,
default True - use ``multiprocess.Pipe`` which is recommended
queue_size (int, optional): only useful when ``use_pipe`` is False - ``multiprocess.Queue``
is used, default 1000. Increase this value can speed up the data reading, and more memory
will be consumed.
you need to create multiple readers first, these readers should be independent Returns:
to each other so that each process can work independently. ``generator``: a new reader which can be run parallelly
An example:
Example:
.. code-block:: python .. code-block:: python
reader0 = reader(["file01", "file02"]) import paddle.fluid as fluid
reader1 = reader(["file11", "file12"]) from paddle.fluid.io import multiprocess_reader
reader1 = reader(["file21", "file22"]) import numpy as np
reader = multiprocess_reader([reader0, reader1, reader2],
queue_size=100, use_pipe=False) sample_files = ['sample_file_1', 'sample_file_2']
def fake_input_files():
with open(sample_files[0], 'w') as f:
np.savez(f, a=np.array([1, 2]), b=np.array([3, 4]), c=np.array([5, 6]), d=np.array([7, 8]))
with open(sample_files[1], 'w') as f:
np.savez(f, a=np.array([9, 10]), b=np.array([11, 12]), c=np.array([13, 14]))
def generate_reader(file_name):
# load data file
def _impl():
data = np.load(file_name)
for item in sorted(data.files):
yield data[item],
return _impl
if __name__ == '__main__':
# generate sample input files
fake_input_files()
with fluid.program_guard(fluid.Program(), fluid.Program()):
place = fluid.CPUPlace()
# the 1st 2 is batch size
image = fluid.data(name='image', dtype='int64', shape=[2, 1, 2])
fluid.layers.Print(image)
# print detailed tensor info of image variable
reader = fluid.io.PyReader(feed_list=[image], capacity=2)
decorated_reader = multiprocess_reader(
[generate_reader(sample_files[0]), generate_reader(sample_files[1])], False)
reader.decorate_sample_generator(decorated_reader, batch_size=2, places=[place])
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
for data in reader():
res = exe.run(feed=data, fetch_list=[image])
print(res[0])
# print below content in this case
# [[[1 2]], [[3 4]]]
# [[[5 6]], [[7 8]]]
# [[[9 10]], [[11 12]]]
# [13,14] will be dropped
""" """
try: try:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册