未验证 提交 5fc30522 编写于 作者: Q Qiao Longfei 提交者: GitHub

Merge pull request #13787 from PaddlePaddle/revert-13637-optimize-opyreader

Revert "optimize pyreader"
...@@ -178,7 +178,6 @@ paddle.fluid.layers.batch ArgSpec(args=['reader', 'batch_size'], varargs=None, k ...@@ -178,7 +178,6 @@ paddle.fluid.layers.batch ArgSpec(args=['reader', 'batch_size'], varargs=None, k
paddle.fluid.layers.double_buffer ArgSpec(args=['reader', 'place', 'name'], varargs=None, keywords=None, defaults=(None, None)) paddle.fluid.layers.double_buffer ArgSpec(args=['reader', 'place', 'name'], varargs=None, keywords=None, defaults=(None, None))
paddle.fluid.layers.random_data_generator ArgSpec(args=['low', 'high', 'shapes', 'lod_levels', 'for_parallel'], varargs=None, keywords=None, defaults=(True,)) paddle.fluid.layers.random_data_generator ArgSpec(args=['low', 'high', 'shapes', 'lod_levels', 'for_parallel'], varargs=None, keywords=None, defaults=(True,))
paddle.fluid.layers.py_reader ArgSpec(args=['capacity', 'shapes', 'dtypes', 'lod_levels', 'name', 'use_double_buffer'], varargs=None, keywords=None, defaults=(None, None, True)) paddle.fluid.layers.py_reader ArgSpec(args=['capacity', 'shapes', 'dtypes', 'lod_levels', 'name', 'use_double_buffer'], varargs=None, keywords=None, defaults=(None, None, True))
paddle.fluid.layers.create_py_reader_by_data ArgSpec(args=['capacity', 'feed_list', 'name', 'use_double_buffer'], varargs=None, keywords=None, defaults=(None, True))
paddle.fluid.layers.Preprocessor.__init__ ArgSpec(args=['self', 'reader', 'name'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.layers.Preprocessor.__init__ ArgSpec(args=['self', 'reader', 'name'], varargs=None, keywords=None, defaults=(None,))
paddle.fluid.layers.Preprocessor.block ArgSpec(args=[], varargs='args', keywords='kwds', defaults=None) paddle.fluid.layers.Preprocessor.block ArgSpec(args=[], varargs='args', keywords='kwds', defaults=None)
paddle.fluid.layers.Preprocessor.inputs ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) paddle.fluid.layers.Preprocessor.inputs ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None)
......
...@@ -12,5 +12,6 @@ endif(NOT WIN32) ...@@ -12,5 +12,6 @@ endif(NOT WIN32)
if(WITH_INFERENCE) if(WITH_INFERENCE)
# NOTE: please add subdirectory inference at last. # NOTE: please add subdirectory inference at last.
add_subdirectory(inference) add_subdirectory(inference)
add_subdirectory(train)
endif() endif()
add_subdirectory(train)
...@@ -30,8 +30,7 @@ from ..unique_name import generate as unique_name ...@@ -30,8 +30,7 @@ from ..unique_name import generate as unique_name
__all__ = [ __all__ = [
'data', 'open_files', 'read_file', 'shuffle', 'batch', 'double_buffer', 'data', 'open_files', 'read_file', 'shuffle', 'batch', 'double_buffer',
'random_data_generator', 'py_reader', 'create_py_reader_by_data', 'random_data_generator', 'py_reader', 'Preprocessor', 'load'
'Preprocessor', 'load'
] ]
...@@ -471,158 +470,6 @@ def random_data_generator(low, high, shapes, lod_levels, for_parallel=True): ...@@ -471,158 +470,6 @@ def random_data_generator(low, high, shapes, lod_levels, for_parallel=True):
return monkey_patch_reader_methods(main_prog_var) return monkey_patch_reader_methods(main_prog_var)
def _py_reader(capacity,
shapes,
dtypes,
lod_levels=None,
name=None,
use_double_buffer=True,
feed_list=None):
if feed_list is not None:
if not isinstance(feed_list, list):
raise TypeError("feed_list should be a list of Variable"
" instead of " + str(type(feed_list)))
lod_levels = []
dtypes = []
shape_concat = []
ranks = []
shapes = []
for data in feed_list:
dtypes.append(data.dtype)
shape_concat.extend(data.shape)
ranks.append(len(data.shape))
shapes.append(data.shape)
lod_levels.append(data.lod_level)
else:
dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes]
shape_concat = []
ranks = []
for shape in shapes:
shape_concat.extend(shape)
ranks.append(len(shape))
if lod_levels is None:
lod_levels = [0] * len(shapes)
if name is None:
queue_name = unique_name('lod_tensor_blocking_queue')
reader_name = unique_name('create_py_reader')
double_buffer_name = unique_name('double_buffer')
else:
queue_name = "_".join([name, "queue"])
reader_name = "_".join([name, "reader"])
double_buffer_name = "_".join([name, "double_buffer"])
var = global_scope().var(queue_name)
feed_queue = core.init_lod_tensor_blocking_queue(var, capacity, shapes)
startup_blk = default_startup_program().current_block()
startup_var = startup_blk.create_var(name=reader_name)
startup_blk.append_op(
type='create_py_reader',
inputs={'blocking_queue': [queue_name]},
outputs={'Out': [startup_var]},
attrs={
'shape_concat': shape_concat,
'lod_levels': lod_levels,
'ranks': ranks
})
startup_var.desc.set_dtypes(dtypes)
startup_var.persistable = True
main_prog_var = _copy_reader_var_(default_main_program().current_block(),
startup_var)
reader = monkey_patch_reader_methods(main_prog_var)
if use_double_buffer:
double_buffer_reader = double_buffer(reader, name=double_buffer_name)
# we return a double buffer reader. However, the reset method comes from
# py_reader.
double_buffer_reader.reset = reader.reset
reader = double_buffer_reader
# monkey patch py_reader special methods
reader.queue = feed_queue
current_reset_method = reader.reset
reader.thread = None
reader.tensor_provider = None
reader.exited = False
def start_provide_thread(func):
def __provider_thread__():
for tensors in func():
array = core.LoDTensorArray()
for item in tensors:
if not isinstance(item, core.LoDTensor):
tmp = core.LoDTensor()
tmp.set(item, core.CPUPlace())
item = tmp
array.append(item)
if reader.exited:
break
feed_queue.push(array)
if reader.exited:
break
feed_queue.close()
reader.thread = threading.Thread(target=__provider_thread__)
reader.thread.daemon = True
reader.thread.start()
def __set_tensor_provider__(func):
reader.tensor_provider = func
def __set_paddle_reader__(paddle_reader):
with program_guard(Program(), Program()):
actual_feed_list = feed_list
if actual_feed_list is None:
actual_feed_list = []
counter = 0
for dtype, shape, lod_level in zip(dtypes, shapes, lod_levels):
name = str(counter)
actual_feed_list.append(
data(
name=name,
dtype=dtype,
shape=shape,
lod_level=lod_level))
counter += 1
feeder = DataFeeder(
feed_list=actual_feed_list, place=core.CPUPlace())
paddle_reader = feeder.decorate_reader(
paddle_reader, multi_devices=False)
def __tensor_provider__():
for slots in paddle_reader():
yield [slots[str(idx)] for idx in six.moves.xrange(counter)]
__set_tensor_provider__(__tensor_provider__)
def __reset__():
current_reset_method()
if reader.thread is not None and reader.tensor_provider is not None:
reader.exited = True
reader.thread.join()
reader.exited = False
def __start__():
start_provide_thread(reader.tensor_provider)
reader.reset = __reset__
reader.decorate_tensor_provider = __set_tensor_provider__
reader.decorate_paddle_reader = __set_paddle_reader__
reader.start = __start__
return reader
def py_reader(capacity, def py_reader(capacity,
shapes, shapes,
dtypes, dtypes,
...@@ -747,72 +594,128 @@ def py_reader(capacity, ...@@ -747,72 +594,128 @@ def py_reader(capacity,
>>> except fluid.core.EOFException: >>> except fluid.core.EOFException:
>>> test_reader.reset() >>> test_reader.reset()
""" """
return _py_reader( dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes]
capacity=capacity, shape_concat = []
shapes=shapes, ranks = []
dtypes=dtypes,
lod_levels=lod_levels,
name=name,
use_double_buffer=use_double_buffer)
for shape in shapes:
shape_concat.extend(shape)
ranks.append(len(shape))
def create_py_reader_by_data(capacity, if lod_levels is None:
feed_list, lod_levels = [0] * len(shapes)
name=None,
use_double_buffer=True):
"""
Create a Python reader for data feeding in Python
This layer returns a Reader Variable. if name is None:
queue_name = unique_name('lod_tensor_blocking_queue')
reader_name = unique_name('create_py_reader')
double_buffer_name = unique_name('double_buffer')
else:
queue_name = "_".join([name, "queue"])
reader_name = "_".join([name, "reader"])
double_buffer_name = "_".join([name, "double_buffer"])
Works much like py_reader except that it's input is feed_list var = global_scope().var(queue_name)
instead of shapes, dtypes and lod_levels feed_queue = core.init_lod_tensor_blocking_queue(var, capacity, shapes)
Args: startup_blk = default_startup_program().current_block()
capacity(int): The buffer capacity maintained by :code:`py_reader`. startup_var = startup_blk.create_var(name=reader_name)
feed_list(list(Variable)): The data feed list. startup_blk.append_op(
name(basestring): The prefix Python queue name and Reader name. None will type='create_py_reader',
be generated automatically. inputs={'blocking_queue': [queue_name]},
use_double_buffer(bool): Whether use double buffer or not. outputs={'Out': [startup_var]},
attrs={
'shape_concat': shape_concat,
'lod_levels': lod_levels,
'ranks': ranks
})
Returns: startup_var.desc.set_dtypes(dtypes)
Variable: A Reader from which we can get feeding data. startup_var.persistable = True
Examples: main_prog_var = _copy_reader_var_(default_main_program().current_block(),
startup_var)
1. The basic usage of :code:`py_reader` is as follows: reader = monkey_patch_reader_methods(main_prog_var)
if use_double_buffer:
double_buffer_reader = double_buffer(reader, name=double_buffer_name)
# we return a double buffer reader. However, the reset method comes from
# py_reader.
double_buffer_reader.reset = reader.reset
reader = double_buffer_reader
>>> import paddle.fluid as fluid # monkey patch py_reader special methods
>>> import paddle.dataset.mnist as mnist reader.queue = feed_queue
>>> current_reset_method = reader.reset
>>> image = fluid.layers.data(name='image', shape=[3,224,224], dtypes='float32') reader.thread = None
>>> label = fluid.layers.data(name='label', shape=[1], dtypes='int64') reader.tensor_provider = None
>>> reader = fluid.layers.create_py_reader_by_data(capacity=64, feed_list=[image, label]) reader.exited = False
>>> reader.decorate_paddle_reader(
>>> paddle.reader.shuffle(paddle.batch(mnist.train()) def start_provide_thread(func):
>>> def __provider_thread__():
>>> img, label = fluid.layers.read_file(reader) for tensors in func():
>>> loss = network(img, label) # some network definition array = core.LoDTensorArray()
>>> for item in tensors:
>>> fluid.Executor(fluid.CUDAPlace(0)).run(fluid.default_startup_program()) if not isinstance(item, core.LoDTensor):
>>> tmp = core.LoDTensor()
>>> exe = fluid.ParallelExecutor(use_cuda=True, loss_name=loss.name) tmp.set(item, core.CPUPlace())
>>> for epoch_id in range(10): item = tmp
>>> reader.start()
>>> try: array.append(item)
>>> while True:
>>> exe.run(fetch_list=[loss.name]) if reader.exited:
>>> except fluid.core.EOFException: break
>>> reader.reset() feed_queue.push(array)
""" if reader.exited:
return _py_reader( break
capacity=capacity, feed_queue.close()
shapes=None,
dtypes=None, reader.thread = threading.Thread(target=__provider_thread__)
lod_levels=None, reader.thread.daemon = True
name=name, reader.thread.start()
use_double_buffer=use_double_buffer,
feed_list=feed_list) def __set_tensor_provider__(func):
reader.tensor_provider = func
def __set_paddle_reader__(paddle_reader):
with program_guard(Program(), Program()):
feed_list = []
counter = 0
for dtype, shape, lod_level in zip(dtypes, shapes, lod_levels):
name = str(counter)
feed_list.append(
data(
name=name,
dtype=dtype,
shape=shape,
lod_level=lod_level))
counter += 1
feeder = DataFeeder(feed_list=feed_list, place=core.CPUPlace())
paddle_reader = feeder.decorate_reader(
paddle_reader, multi_devices=False)
def __tensor_provider__():
for slots in paddle_reader():
yield [slots[str(idx)] for idx in six.moves.xrange(counter)]
__set_tensor_provider__(__tensor_provider__)
def __reset__():
current_reset_method()
if reader.thread is not None and reader.tensor_provider is not None:
reader.exited = True
reader.thread.join()
reader.exited = False
def __start__():
start_provide_thread(reader.tensor_provider)
reader.reset = __reset__
reader.decorate_tensor_provider = __set_tensor_provider__
reader.decorate_paddle_reader = __set_paddle_reader__
reader.start = __start__
return reader
def open_files(filenames, def open_files(filenames,
......
...@@ -53,22 +53,13 @@ def simple_fc_net(in_size, ...@@ -53,22 +53,13 @@ def simple_fc_net(in_size,
hidden_sizes, hidden_sizes,
batch_size, batch_size,
queue_capacity, queue_capacity,
use_double_buffer=False, use_double_buffer=False):
use_feed_list=True): reader = fluid.layers.py_reader(
if use_feed_list: capacity=queue_capacity,
data = fluid.layers.data(name="data", dtype='float32', shape=[in_size]) shapes=[[-1, in_size], [-1, 1]],
label = fluid.layers.data(name='label', dtype='int64', shape=[1]) lod_levels=[0, 0],
reader = fluid.layers.create_py_reader_by_data( dtypes=['float32', 'int64'],
capacity=queue_capacity, use_double_buffer=False)
use_double_buffer=False,
feed_list=[data, label])
else:
reader = fluid.layers.py_reader(
capacity=queue_capacity,
shapes=[[-1, in_size], [-1, 1]],
lod_levels=[0, 0],
dtypes=['float32', 'int64'],
use_double_buffer=False)
feed_queue = reader.queue feed_queue = reader.queue
reader = fluid.layers.batch(reader, batch_size=batch_size) reader = fluid.layers.batch(reader, batch_size=batch_size)
if use_double_buffer: if use_double_buffer:
...@@ -109,16 +100,14 @@ class TestPyReaderUsingExecutor(unittest.TestCase): ...@@ -109,16 +100,14 @@ class TestPyReaderUsingExecutor(unittest.TestCase):
if core.is_compiled_with_cuda() else [False]): if core.is_compiled_with_cuda() else [False]):
for use_parallel_executor in [False, True]: for use_parallel_executor in [False, True]:
for use_double_buffer in [False, True]: for use_double_buffer in [False, True]:
for use_feed_list in [False, True]: print('Test Parameters:'),
print('Test Parameters:'), print({
print({ 'use_cuda': use_cuda,
'use_cuda': use_cuda, 'use_parallel_executor': use_parallel_executor,
'use_parallel_executor': use_parallel_executor, 'use_double_buffer': use_double_buffer
'use_double_buffer': use_double_buffer, })
'use_feed_list': use_feed_list self.main(use_cuda, use_parallel_executor,
}) use_double_buffer)
self.main(use_cuda, use_parallel_executor,
use_double_buffer, use_feed_list)
def random_reader(self): def random_reader(self):
def reader(): def reader():
...@@ -154,14 +143,12 @@ class TestPyReaderUsingExecutor(unittest.TestCase): ...@@ -154,14 +143,12 @@ class TestPyReaderUsingExecutor(unittest.TestCase):
def main(self, def main(self,
use_cuda=True, use_cuda=True,
use_parallel_executor=False, use_parallel_executor=False,
use_double_buffer=False, use_double_buffer=False):
use_feed_list=False):
assert not use_cuda or use_cuda and core.is_compiled_with_cuda() assert not use_cuda or use_cuda and core.is_compiled_with_cuda()
self.use_cuda = use_cuda self.use_cuda = use_cuda
self.use_parallel_executor = use_parallel_executor self.use_parallel_executor = use_parallel_executor
self.use_double_buffer = use_double_buffer self.use_double_buffer = use_double_buffer
self.use_feed_list = use_feed_list
startup_program = fluid.Program() startup_program = fluid.Program()
main_program = fluid.Program() main_program = fluid.Program()
...@@ -173,8 +160,7 @@ class TestPyReaderUsingExecutor(unittest.TestCase): ...@@ -173,8 +160,7 @@ class TestPyReaderUsingExecutor(unittest.TestCase):
hidden_sizes=self.hidden_sizes, hidden_sizes=self.hidden_sizes,
batch_size=self.batch_size, batch_size=self.batch_size,
queue_capacity=self.queue_capacity, queue_capacity=self.queue_capacity,
use_double_buffer=self.use_double_buffer, use_double_buffer=self.use_double_buffer)
use_feed_list=self.use_feed_list)
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册