From c70fec99ab978120c259ba442636d91f0aae024e Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Thu, 11 Oct 2018 09:29:58 +0800 Subject: [PATCH] optimize pyreader --- paddle/fluid/API.spec | 1 + paddle/fluid/CMakeLists.txt | 3 +- python/paddle/fluid/layers/io.py | 325 ++++++++++++------ .../test_py_reader_using_executor.py | 48 ++- 4 files changed, 244 insertions(+), 133 deletions(-) diff --git a/paddle/fluid/API.spec b/paddle/fluid/API.spec index c6dd919a93d..d0ae8027469 100644 --- a/paddle/fluid/API.spec +++ b/paddle/fluid/API.spec @@ -178,6 +178,7 @@ paddle.fluid.layers.batch ArgSpec(args=['reader', 'batch_size'], varargs=None, k paddle.fluid.layers.double_buffer ArgSpec(args=['reader', 'place', 'name'], varargs=None, keywords=None, defaults=(None, None)) paddle.fluid.layers.random_data_generator ArgSpec(args=['low', 'high', 'shapes', 'lod_levels', 'for_parallel'], varargs=None, keywords=None, defaults=(True,)) paddle.fluid.layers.py_reader ArgSpec(args=['capacity', 'shapes', 'dtypes', 'lod_levels', 'name', 'use_double_buffer'], varargs=None, keywords=None, defaults=(None, None, True)) +paddle.fluid.layers.create_py_reader_by_data ArgSpec(args=['capacity', 'feed_list', 'name', 'use_double_buffer'], varargs=None, keywords=None, defaults=(None, True)) paddle.fluid.layers.Preprocessor.__init__ ArgSpec(args=['self', 'reader', 'name'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.layers.Preprocessor.block ArgSpec(args=[], varargs='args', keywords='kwds', defaults=None) paddle.fluid.layers.Preprocessor.inputs ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) diff --git a/paddle/fluid/CMakeLists.txt b/paddle/fluid/CMakeLists.txt index 519a00fb073..48b36df6499 100644 --- a/paddle/fluid/CMakeLists.txt +++ b/paddle/fluid/CMakeLists.txt @@ -12,6 +12,5 @@ endif(NOT WIN32) if(WITH_INFERENCE) # NOTE: please add subdirectory inference at last. add_subdirectory(inference) + add_subdirectory(train) endif() - -add_subdirectory(train) diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 81c78cba219..25fde782b7e 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -30,7 +30,8 @@ from ..unique_name import generate as unique_name __all__ = [ 'data', 'open_files', 'read_file', 'shuffle', 'batch', 'double_buffer', - 'random_data_generator', 'py_reader', 'Preprocessor', 'load' + 'random_data_generator', 'py_reader', 'create_py_reader_by_data', + 'Preprocessor', 'load' ] @@ -470,6 +471,158 @@ def random_data_generator(low, high, shapes, lod_levels, for_parallel=True): return monkey_patch_reader_methods(main_prog_var) +def _py_reader(capacity, + shapes, + dtypes, + lod_levels=None, + name=None, + use_double_buffer=True, + feed_list=None): + + if feed_list is not None: + if not isinstance(feed_list, list): + raise TypeError("feed_list should be a list of Variable" + " instead of " + str(type(feed_list))) + lod_levels = [] + dtypes = [] + shape_concat = [] + ranks = [] + shapes = [] + + for data in feed_list: + dtypes.append(data.dtype) + shape_concat.extend(data.shape) + ranks.append(len(data.shape)) + shapes.append(data.shape) + lod_levels.append(data.lod_level) + else: + dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes] + shape_concat = [] + ranks = [] + + for shape in shapes: + shape_concat.extend(shape) + ranks.append(len(shape)) + + if lod_levels is None: + lod_levels = [0] * len(shapes) + + if name is None: + queue_name = unique_name('lod_tensor_blocking_queue') + reader_name = unique_name('create_py_reader') + double_buffer_name = unique_name('double_buffer') + else: + queue_name = "_".join([name, "queue"]) + reader_name = "_".join([name, "reader"]) + double_buffer_name = "_".join([name, "double_buffer"]) + + var = global_scope().var(queue_name) + feed_queue = core.init_lod_tensor_blocking_queue(var, capacity, shapes) + + startup_blk = default_startup_program().current_block() + startup_var = startup_blk.create_var(name=reader_name) + startup_blk.append_op( + type='create_py_reader', + inputs={'blocking_queue': [queue_name]}, + outputs={'Out': [startup_var]}, + attrs={ + 'shape_concat': shape_concat, + 'lod_levels': lod_levels, + 'ranks': ranks + }) + + startup_var.desc.set_dtypes(dtypes) + startup_var.persistable = True + + main_prog_var = _copy_reader_var_(default_main_program().current_block(), + startup_var) + + reader = monkey_patch_reader_methods(main_prog_var) + if use_double_buffer: + double_buffer_reader = double_buffer(reader, name=double_buffer_name) + # we return a double buffer reader. However, the reset method comes from + # py_reader. + double_buffer_reader.reset = reader.reset + reader = double_buffer_reader + + # monkey patch py_reader special methods + reader.queue = feed_queue + current_reset_method = reader.reset + reader.thread = None + reader.tensor_provider = None + reader.exited = False + + def start_provide_thread(func): + def __provider_thread__(): + for tensors in func(): + array = core.LoDTensorArray() + for item in tensors: + if not isinstance(item, core.LoDTensor): + tmp = core.LoDTensor() + tmp.set(item, core.CPUPlace()) + item = tmp + + array.append(item) + + if reader.exited: + break + feed_queue.push(array) + if reader.exited: + break + feed_queue.close() + + reader.thread = threading.Thread(target=__provider_thread__) + reader.thread.daemon = True + reader.thread.start() + + def __set_tensor_provider__(func): + reader.tensor_provider = func + + def __set_paddle_reader__(paddle_reader): + with program_guard(Program(), Program()): + actual_feed_list = feed_list + if actual_feed_list is None: + actual_feed_list = [] + counter = 0 + for dtype, shape, lod_level in zip(dtypes, shapes, lod_levels): + name = str(counter) + actual_feed_list.append( + data( + name=name, + dtype=dtype, + shape=shape, + lod_level=lod_level)) + counter += 1 + + feeder = DataFeeder( + feed_list=actual_feed_list, place=core.CPUPlace()) + paddle_reader = feeder.decorate_reader( + paddle_reader, multi_devices=False) + + def __tensor_provider__(): + for slots in paddle_reader(): + yield [slots[str(idx)] for idx in six.moves.xrange(counter)] + + __set_tensor_provider__(__tensor_provider__) + + def __reset__(): + current_reset_method() + if reader.thread is not None and reader.tensor_provider is not None: + reader.exited = True + reader.thread.join() + reader.exited = False + + def __start__(): + start_provide_thread(reader.tensor_provider) + + reader.reset = __reset__ + reader.decorate_tensor_provider = __set_tensor_provider__ + reader.decorate_paddle_reader = __set_paddle_reader__ + reader.start = __start__ + + return reader + + def py_reader(capacity, shapes, dtypes, @@ -594,128 +747,72 @@ def py_reader(capacity, >>> except fluid.core.EOFException: >>> test_reader.reset() """ - dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes] - shape_concat = [] - ranks = [] - - for shape in shapes: - shape_concat.extend(shape) - ranks.append(len(shape)) - - if lod_levels is None: - lod_levels = [0] * len(shapes) - - if name is None: - queue_name = unique_name('lod_tensor_blocking_queue') - reader_name = unique_name('create_py_reader') - double_buffer_name = unique_name('double_buffer') - else: - queue_name = "_".join([name, "queue"]) - reader_name = "_".join([name, "reader"]) - double_buffer_name = "_".join([name, "double_buffer"]) - - var = global_scope().var(queue_name) - feed_queue = core.init_lod_tensor_blocking_queue(var, capacity, shapes) - - startup_blk = default_startup_program().current_block() - startup_var = startup_blk.create_var(name=reader_name) - startup_blk.append_op( - type='create_py_reader', - inputs={'blocking_queue': [queue_name]}, - outputs={'Out': [startup_var]}, - attrs={ - 'shape_concat': shape_concat, - 'lod_levels': lod_levels, - 'ranks': ranks - }) - - startup_var.desc.set_dtypes(dtypes) - startup_var.persistable = True - - main_prog_var = _copy_reader_var_(default_main_program().current_block(), - startup_var) - - reader = monkey_patch_reader_methods(main_prog_var) - if use_double_buffer: - double_buffer_reader = double_buffer(reader, name=double_buffer_name) - # we return a double buffer reader. However, the reset method comes from - # py_reader. - double_buffer_reader.reset = reader.reset - reader = double_buffer_reader - - # monkey patch py_reader special methods - reader.queue = feed_queue - current_reset_method = reader.reset - reader.thread = None - reader.tensor_provider = None - reader.exited = False - - def start_provide_thread(func): - def __provider_thread__(): - for tensors in func(): - array = core.LoDTensorArray() - for item in tensors: - if not isinstance(item, core.LoDTensor): - tmp = core.LoDTensor() - tmp.set(item, core.CPUPlace()) - item = tmp + return _py_reader( + capacity=capacity, + shapes=shapes, + dtypes=dtypes, + lod_levels=lod_levels, + name=name, + use_double_buffer=use_double_buffer) - array.append(item) - if reader.exited: - break - feed_queue.push(array) - if reader.exited: - break - feed_queue.close() - - reader.thread = threading.Thread(target=__provider_thread__) - reader.thread.daemon = True - reader.thread.start() - - def __set_tensor_provider__(func): - reader.tensor_provider = func +def create_py_reader_by_data(capacity, + feed_list, + name=None, + use_double_buffer=True): + """ + Create a Python reader for data feeding in Python - def __set_paddle_reader__(paddle_reader): - with program_guard(Program(), Program()): - feed_list = [] - counter = 0 - for dtype, shape, lod_level in zip(dtypes, shapes, lod_levels): - name = str(counter) - feed_list.append( - data( - name=name, - dtype=dtype, - shape=shape, - lod_level=lod_level)) - counter += 1 - - feeder = DataFeeder(feed_list=feed_list, place=core.CPUPlace()) - paddle_reader = feeder.decorate_reader( - paddle_reader, multi_devices=False) + This layer returns a Reader Variable. - def __tensor_provider__(): - for slots in paddle_reader(): - yield [slots[str(idx)] for idx in six.moves.xrange(counter)] + Works much like py_reader except that it's input is feed_list + instead of shapes, dtypes and lod_levels - __set_tensor_provider__(__tensor_provider__) + Args: + capacity(int): The buffer capacity maintained by :code:`py_reader`. + feed_list(list(Variable)): The data feed list. + name(basestring): The prefix Python queue name and Reader name. None will + be generated automatically. + use_double_buffer(bool): Whether use double buffer or not. - def __reset__(): - current_reset_method() - if reader.thread is not None and reader.tensor_provider is not None: - reader.exited = True - reader.thread.join() - reader.exited = False + Returns: + Variable: A Reader from which we can get feeding data. - def __start__(): - start_provide_thread(reader.tensor_provider) + Examples: - reader.reset = __reset__ - reader.decorate_tensor_provider = __set_tensor_provider__ - reader.decorate_paddle_reader = __set_paddle_reader__ - reader.start = __start__ + 1. The basic usage of :code:`py_reader` is as follows: - return reader + >>> import paddle.fluid as fluid + >>> import paddle.dataset.mnist as mnist + >>> + >>> image = fluid.layers.data(name='image', shape=[3,224,224], dtypes='float32') + >>> label = fluid.layers.data(name='label', shape=[1], dtypes='int64') + >>> reader = fluid.layers.create_py_reader_by_data(capacity=64, feed_list=[image, label]) + >>> reader.decorate_paddle_reader( + >>> paddle.reader.shuffle(paddle.batch(mnist.train()) + >>> + >>> img, label = fluid.layers.read_file(reader) + >>> loss = network(img, label) # some network definition + >>> + >>> fluid.Executor(fluid.CUDAPlace(0)).run(fluid.default_startup_program()) + >>> + >>> exe = fluid.ParallelExecutor(use_cuda=True, loss_name=loss.name) + >>> for epoch_id in range(10): + >>> reader.start() + >>> try: + >>> while True: + >>> exe.run(fetch_list=[loss.name]) + >>> except fluid.core.EOFException: + >>> reader.reset() + """ + return _py_reader( + capacity=capacity, + shapes=None, + dtypes=None, + lod_levels=None, + name=name, + use_double_buffer=use_double_buffer, + feed_list=feed_list) def open_files(filenames, diff --git a/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py b/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py index b7fad9b3a60..b85b94c939f 100644 --- a/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py +++ b/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py @@ -53,13 +53,22 @@ def simple_fc_net(in_size, hidden_sizes, batch_size, queue_capacity, - use_double_buffer=False): - reader = fluid.layers.py_reader( - capacity=queue_capacity, - shapes=[[-1, in_size], [-1, 1]], - lod_levels=[0, 0], - dtypes=['float32', 'int64'], - use_double_buffer=False) + use_double_buffer=False, + use_feed_list=True): + if use_feed_list: + data = fluid.layers.data(name="data", dtype='float32', shape=[in_size]) + label = fluid.layers.data(name='label', dtype='int64', shape=[1]) + reader = fluid.layers.create_py_reader_by_data( + capacity=queue_capacity, + use_double_buffer=False, + feed_list=[data, label]) + else: + reader = fluid.layers.py_reader( + capacity=queue_capacity, + shapes=[[-1, in_size], [-1, 1]], + lod_levels=[0, 0], + dtypes=['float32', 'int64'], + use_double_buffer=False) feed_queue = reader.queue reader = fluid.layers.batch(reader, batch_size=batch_size) if use_double_buffer: @@ -100,14 +109,16 @@ class TestPyReaderUsingExecutor(unittest.TestCase): if core.is_compiled_with_cuda() else [False]): for use_parallel_executor in [False, True]: for use_double_buffer in [False, True]: - print('Test Parameters:'), - print({ - 'use_cuda': use_cuda, - 'use_parallel_executor': use_parallel_executor, - 'use_double_buffer': use_double_buffer - }) - self.main(use_cuda, use_parallel_executor, - use_double_buffer) + for use_feed_list in [False, True]: + print('Test Parameters:'), + print({ + 'use_cuda': use_cuda, + 'use_parallel_executor': use_parallel_executor, + 'use_double_buffer': use_double_buffer, + 'use_feed_list': use_feed_list + }) + self.main(use_cuda, use_parallel_executor, + use_double_buffer, use_feed_list) def random_reader(self): def reader(): @@ -143,12 +154,14 @@ class TestPyReaderUsingExecutor(unittest.TestCase): def main(self, use_cuda=True, use_parallel_executor=False, - use_double_buffer=False): + use_double_buffer=False, + use_feed_list=False): assert not use_cuda or use_cuda and core.is_compiled_with_cuda() self.use_cuda = use_cuda self.use_parallel_executor = use_parallel_executor self.use_double_buffer = use_double_buffer + self.use_feed_list = use_feed_list startup_program = fluid.Program() main_program = fluid.Program() @@ -160,7 +173,8 @@ class TestPyReaderUsingExecutor(unittest.TestCase): hidden_sizes=self.hidden_sizes, batch_size=self.batch_size, queue_capacity=self.queue_capacity, - use_double_buffer=self.use_double_buffer) + use_double_buffer=self.use_double_buffer, + use_feed_list=self.use_feed_list) place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() -- GitLab