diff --git a/paddle/fluid/CMakeLists.txt b/paddle/fluid/CMakeLists.txt index 519a00fb073b08f6c88de8186de187476b548fd3..48b36df6499e59fe742766b5f81fd30a9fb8b900 100644 --- a/paddle/fluid/CMakeLists.txt +++ b/paddle/fluid/CMakeLists.txt @@ -12,6 +12,5 @@ endif(NOT WIN32) if(WITH_INFERENCE) # NOTE: please add subdirectory inference at last. add_subdirectory(inference) + add_subdirectory(train) endif() - -add_subdirectory(train) diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 75c29b12724d53783b9748d6df066c52bd232482..368201ea7eea410aa08d58e3b1eec4305bdc3460 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -30,7 +30,8 @@ from ..unique_name import generate as unique_name __all__ = [ 'data', 'open_files', 'read_file', 'shuffle', 'batch', 'double_buffer', - 'random_data_generator', 'py_reader', 'Preprocessor', 'load' + 'random_data_generator', 'py_reader', 'py_reader_by_data', 'Preprocessor', + 'load' ] @@ -471,6 +472,154 @@ def random_data_generator(low, high, shapes, lod_levels, for_parallel=True): return monkey_patch_reader_methods(main_prog_var) +def _py_reader(capacity, + shapes, + dtypes, + lod_levels=None, + name=None, + use_double_buffer=True, + feed_list=None): + if feed_list is not None: + assert isinstance(feed_list, list) + lod_levels = [] + dtypes = [] + shape_concat = [] + ranks = [] + shapes = [] + + for data in feed_list: + dtypes.append(data.dtype) + shape_concat.extend(data.shape) + ranks.append(len(data.shape)) + shapes.append(data.shape) + lod_levels.append(data.lod_level) + else: + dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes] + shape_concat = [] + ranks = [] + + for shape in shapes: + shape_concat.extend(shape) + ranks.append(len(shape)) + + if lod_levels is None: + lod_levels = [0] * len(shapes) + + if name is None: + queue_name = unique_name('lod_tensor_blocking_queue') + reader_name = unique_name('create_py_reader') + double_buffer_name = unique_name('double_buffer') + else: + queue_name = "_".join([name, "queue"]) + reader_name = "_".join([name, "reader"]) + double_buffer_name = "_".join([name, "double_buffer"]) + + var = global_scope().var(queue_name) + feed_queue = core.init_lod_tensor_blocking_queue(var, capacity, shapes) + + startup_blk = default_startup_program().current_block() + startup_var = startup_blk.create_var(name=reader_name) + startup_blk.append_op( + type='create_py_reader', + inputs={'blocking_queue': [queue_name]}, + outputs={'Out': [startup_var]}, + attrs={ + 'shape_concat': shape_concat, + 'lod_levels': lod_levels, + 'ranks': ranks + }) + + startup_var.desc.set_dtypes(dtypes) + startup_var.desc.set_lod_levels(lod_levels) + startup_var.persistable = True + + main_prog_var = _copy_reader_var_(default_main_program().current_block(), + startup_var) + + reader = monkey_patch_reader_methods(main_prog_var) + if use_double_buffer: + double_buffer_reader = double_buffer(reader, name=double_buffer_name) + # we return a double buffer reader. However, the reset method comes from + # py_reader. + double_buffer_reader.reset = reader.reset + reader = double_buffer_reader + + # monkey patch py_reader special methods + reader.queue = feed_queue + current_reset_method = reader.reset + reader.thread = None + reader.tensor_provider = None + reader.exited = False + + def start_provide_thread(func): + def __provider_thread__(): + for tensors in func(): + array = core.LoDTensorArray() + for item in tensors: + if not isinstance(item, core.LoDTensor): + tmp = core.LoDTensor() + tmp.set(item, core.CPUPlace()) + item = tmp + + array.append(item) + + if reader.exited: + break + feed_queue.push(array) + if reader.exited: + break + feed_queue.close() + + reader.thread = threading.Thread(target=__provider_thread__) + reader.thread.daemon = True + reader.thread.start() + + def __set_tensor_provider__(func): + reader.tensor_provider = func + + def __set_paddle_reader__(paddle_reader): + with program_guard(Program(), Program()): + if feed_list is None: + feed_list = [] + counter = 0 + for dtype, shape, lod_level in zip(dtypes, shapes, lod_levels): + name = str(counter) + feed_list.append( + data( + name=name, + dtype=dtype, + shape=shape, + lod_level=lod_level)) + counter += 1 + + feeder = DataFeeder(feed_list=feed_list, place=core.CPUPlace()) + paddle_reader = feeder.decorate_reader( + paddle_reader, multi_devices=False) + + def __tensor_provider__(): + for slots in paddle_reader(): + yield [slots[str(idx)] for idx in six.moves.xrange(counter)] + + __set_tensor_provider__(__tensor_provider__) + + def __reset__(): + current_reset_method() + if reader.thread is not None and reader.tensor_provider is not None: + reader.exited = True + reader.thread.join() + reader.exited = False + + def __start__(): + start_provide_thread(reader.tensor_provider) + + reader.reset = __reset__ + reader.decorate_tensor_provider = __set_tensor_provider__ + reader.decorate_paddle_reader = __set_paddle_reader__ + reader.start = __start__ + + return reader + + def py_reader(capacity, shapes, dtypes, @@ -597,129 +746,24 @@ def py_reader(capacity, >>> except fluid.core.EOFException: >>> test_reader.reset() """ - dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes] - shape_concat = [] - ranks = [] - - for shape in shapes: - shape_concat.extend(shape) - ranks.append(len(shape)) - - if lod_levels is None: - lod_levels = [0] * len(shapes) - - if name is None: - queue_name = unique_name('lod_tensor_blocking_queue') - reader_name = unique_name('create_py_reader') - double_buffer_name = unique_name('double_buffer') - else: - queue_name = "_".join([name, "queue"]) - reader_name = "_".join([name, "reader"]) - double_buffer_name = "_".join([name, "double_buffer"]) - - var = global_scope().var(queue_name) - feed_queue = core.init_lod_tensor_blocking_queue(var, capacity, shapes) - - startup_blk = default_startup_program().current_block() - startup_var = startup_blk.create_var(name=reader_name) - startup_blk.append_op( - type='create_py_reader', - inputs={'blocking_queue': [queue_name]}, - outputs={'Out': [startup_var]}, - attrs={ - 'shape_concat': shape_concat, - 'lod_levels': lod_levels, - 'ranks': ranks - }) - - startup_var.desc.set_dtypes(dtypes) - startup_var.desc.set_lod_levels(lod_levels) - startup_var.persistable = True - - main_prog_var = _copy_reader_var_(default_main_program().current_block(), - startup_var) - - reader = monkey_patch_reader_methods(main_prog_var) - if use_double_buffer: - double_buffer_reader = double_buffer(reader, name=double_buffer_name) - # we return a double buffer reader. However, the reset method comes from - # py_reader. - double_buffer_reader.reset = reader.reset - reader = double_buffer_reader - - # monkey patch py_reader special methods - reader.queue = feed_queue - current_reset_method = reader.reset - reader.thread = None - reader.tensor_provider = None - reader.exited = False - - def start_provide_thread(func): - def __provider_thread__(): - for tensors in func(): - array = core.LoDTensorArray() - for item in tensors: - if not isinstance(item, core.LoDTensor): - tmp = core.LoDTensor() - tmp.set(item, core.CPUPlace()) - item = tmp - - array.append(item) - - if reader.exited: - break - feed_queue.push(array) - if reader.exited: - break - feed_queue.close() - - reader.thread = threading.Thread(target=__provider_thread__) - reader.thread.daemon = True - reader.thread.start() - - def __set_tensor_provider__(func): - reader.tensor_provider = func - - def __set_paddle_reader__(paddle_reader): - with program_guard(Program(), Program()): - feed_list = [] - counter = 0 - for dtype, shape, lod_level in zip(dtypes, shapes, lod_levels): - name = str(counter) - feed_list.append( - data( - name=name, - dtype=dtype, - shape=shape, - lod_level=lod_level)) - counter += 1 - - feeder = DataFeeder(feed_list=feed_list, place=core.CPUPlace()) - paddle_reader = feeder.decorate_reader( - paddle_reader, multi_devices=False) - - def __tensor_provider__(): - for slots in paddle_reader(): - yield [slots[str(idx)] for idx in six.moves.xrange(counter)] - - __set_tensor_provider__(__tensor_provider__) - - def __reset__(): - current_reset_method() - if reader.thread is not None and reader.tensor_provider is not None: - reader.exited = True - reader.thread.join() - reader.exited = False + return _py_reader( + capacity=capacity, + shapes=shapes, + dtypes=dtypes, + lod_levels=lod_levels, + name=name, + use_double_buffer=use_double_buffer) - def __start__(): - start_provide_thread(reader.tensor_provider) - reader.reset = __reset__ - reader.decorate_tensor_provider = __set_tensor_provider__ - reader.decorate_paddle_reader = __set_paddle_reader__ - reader.start = __start__ - - return reader +def py_reader_by_data(capacity, feed_list, name=None, use_double_buffer=True): + return _py_reader( + capacity=capacity, + shapes=None, + dtypes=None, + lod_levels=None, + name=name, + use_double_buffer=use_double_buffer, + feed_list=feed_list) def open_files(filenames, diff --git a/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py b/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py index b7fad9b3a60632adb564e1d155a3d935706b467f..aaa6e762d6e1608d49777ce29c127ee70671cc63 100644 --- a/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py +++ b/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py @@ -53,13 +53,22 @@ def simple_fc_net(in_size, hidden_sizes, batch_size, queue_capacity, - use_double_buffer=False): - reader = fluid.layers.py_reader( - capacity=queue_capacity, - shapes=[[-1, in_size], [-1, 1]], - lod_levels=[0, 0], - dtypes=['float32', 'int64'], - use_double_buffer=False) + use_double_buffer=False, + use_feed_list=True): + if use_feed_list: + data = fluid.layers.data(name="data", dtype='float32', shape=[in_size]) + label = fluid.layers.data(name='label', dtype='int64', shape=[1]) + reader = fluid.layers.py_reader_by_data( + capacity=queue_capacity, + use_double_buffer=False, + feed_list=[data, label]) + else: + reader = fluid.layers.py_reader( + capacity=queue_capacity, + shapes=[[-1, in_size], [-1, 1]], + lod_levels=[0, 0], + dtypes=['float32', 'int64'], + use_double_buffer=False) feed_queue = reader.queue reader = fluid.layers.batch(reader, batch_size=batch_size) if use_double_buffer: @@ -100,14 +109,15 @@ class TestPyReaderUsingExecutor(unittest.TestCase): if core.is_compiled_with_cuda() else [False]): for use_parallel_executor in [False, True]: for use_double_buffer in [False, True]: - print('Test Parameters:'), - print({ - 'use_cuda': use_cuda, - 'use_parallel_executor': use_parallel_executor, - 'use_double_buffer': use_double_buffer - }) - self.main(use_cuda, use_parallel_executor, - use_double_buffer) + for use_feed_list in [False, True]: + print('Test Parameters:'), + print({ + 'use_cuda': use_cuda, + 'use_parallel_executor': use_parallel_executor, + 'use_double_buffer': use_double_buffer + }) + self.main(use_cuda, use_parallel_executor, + use_double_buffer, use_feed_list) def random_reader(self): def reader(): @@ -143,12 +153,14 @@ class TestPyReaderUsingExecutor(unittest.TestCase): def main(self, use_cuda=True, use_parallel_executor=False, - use_double_buffer=False): + use_double_buffer=False, + use_feed_list=False): assert not use_cuda or use_cuda and core.is_compiled_with_cuda() self.use_cuda = use_cuda self.use_parallel_executor = use_parallel_executor self.use_double_buffer = use_double_buffer + self.use_feed_list = use_feed_list startup_program = fluid.Program() main_program = fluid.Program() @@ -160,7 +172,8 @@ class TestPyReaderUsingExecutor(unittest.TestCase): hidden_sizes=self.hidden_sizes, batch_size=self.batch_size, queue_capacity=self.queue_capacity, - use_double_buffer=self.use_double_buffer) + use_double_buffer=self.use_double_buffer, + use_feed_list=self.use_feed_list) place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()