From 9d087d513979c974a849f988292772c02414f0ad Mon Sep 17 00:00:00 2001 From: Qiao Longfei Date: Tue, 9 Oct 2018 18:54:45 +0800 Subject: [PATCH] Revert "optimize pyreader" test=develop --- paddle/fluid/API.spec | 1 - paddle/fluid/CMakeLists.txt | 3 +- python/paddle/fluid/layers/io.py | 325 ++++++------------ .../test_py_reader_using_executor.py | 48 +-- 4 files changed, 133 insertions(+), 244 deletions(-) diff --git a/paddle/fluid/API.spec b/paddle/fluid/API.spec index d0ae802746..c6dd919a93 100644 --- a/paddle/fluid/API.spec +++ b/paddle/fluid/API.spec @@ -178,7 +178,6 @@ paddle.fluid.layers.batch ArgSpec(args=['reader', 'batch_size'], varargs=None, k paddle.fluid.layers.double_buffer ArgSpec(args=['reader', 'place', 'name'], varargs=None, keywords=None, defaults=(None, None)) paddle.fluid.layers.random_data_generator ArgSpec(args=['low', 'high', 'shapes', 'lod_levels', 'for_parallel'], varargs=None, keywords=None, defaults=(True,)) paddle.fluid.layers.py_reader ArgSpec(args=['capacity', 'shapes', 'dtypes', 'lod_levels', 'name', 'use_double_buffer'], varargs=None, keywords=None, defaults=(None, None, True)) -paddle.fluid.layers.create_py_reader_by_data ArgSpec(args=['capacity', 'feed_list', 'name', 'use_double_buffer'], varargs=None, keywords=None, defaults=(None, True)) paddle.fluid.layers.Preprocessor.__init__ ArgSpec(args=['self', 'reader', 'name'], varargs=None, keywords=None, defaults=(None,)) paddle.fluid.layers.Preprocessor.block ArgSpec(args=[], varargs='args', keywords='kwds', defaults=None) paddle.fluid.layers.Preprocessor.inputs ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None) diff --git a/paddle/fluid/CMakeLists.txt b/paddle/fluid/CMakeLists.txt index 48b36df649..519a00fb07 100644 --- a/paddle/fluid/CMakeLists.txt +++ b/paddle/fluid/CMakeLists.txt @@ -12,5 +12,6 @@ endif(NOT WIN32) if(WITH_INFERENCE) # NOTE: please add subdirectory inference at last. add_subdirectory(inference) - add_subdirectory(train) endif() + +add_subdirectory(train) diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 25fde782b7..81c78cba21 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -30,8 +30,7 @@ from ..unique_name import generate as unique_name __all__ = [ 'data', 'open_files', 'read_file', 'shuffle', 'batch', 'double_buffer', - 'random_data_generator', 'py_reader', 'create_py_reader_by_data', - 'Preprocessor', 'load' + 'random_data_generator', 'py_reader', 'Preprocessor', 'load' ] @@ -471,158 +470,6 @@ def random_data_generator(low, high, shapes, lod_levels, for_parallel=True): return monkey_patch_reader_methods(main_prog_var) -def _py_reader(capacity, - shapes, - dtypes, - lod_levels=None, - name=None, - use_double_buffer=True, - feed_list=None): - - if feed_list is not None: - if not isinstance(feed_list, list): - raise TypeError("feed_list should be a list of Variable" - " instead of " + str(type(feed_list))) - lod_levels = [] - dtypes = [] - shape_concat = [] - ranks = [] - shapes = [] - - for data in feed_list: - dtypes.append(data.dtype) - shape_concat.extend(data.shape) - ranks.append(len(data.shape)) - shapes.append(data.shape) - lod_levels.append(data.lod_level) - else: - dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes] - shape_concat = [] - ranks = [] - - for shape in shapes: - shape_concat.extend(shape) - ranks.append(len(shape)) - - if lod_levels is None: - lod_levels = [0] * len(shapes) - - if name is None: - queue_name = unique_name('lod_tensor_blocking_queue') - reader_name = unique_name('create_py_reader') - double_buffer_name = unique_name('double_buffer') - else: - queue_name = "_".join([name, "queue"]) - reader_name = "_".join([name, "reader"]) - double_buffer_name = "_".join([name, "double_buffer"]) - - var = global_scope().var(queue_name) - feed_queue = core.init_lod_tensor_blocking_queue(var, capacity, shapes) - - startup_blk = default_startup_program().current_block() - startup_var = startup_blk.create_var(name=reader_name) - startup_blk.append_op( - type='create_py_reader', - inputs={'blocking_queue': [queue_name]}, - outputs={'Out': [startup_var]}, - attrs={ - 'shape_concat': shape_concat, - 'lod_levels': lod_levels, - 'ranks': ranks - }) - - startup_var.desc.set_dtypes(dtypes) - startup_var.persistable = True - - main_prog_var = _copy_reader_var_(default_main_program().current_block(), - startup_var) - - reader = monkey_patch_reader_methods(main_prog_var) - if use_double_buffer: - double_buffer_reader = double_buffer(reader, name=double_buffer_name) - # we return a double buffer reader. However, the reset method comes from - # py_reader. - double_buffer_reader.reset = reader.reset - reader = double_buffer_reader - - # monkey patch py_reader special methods - reader.queue = feed_queue - current_reset_method = reader.reset - reader.thread = None - reader.tensor_provider = None - reader.exited = False - - def start_provide_thread(func): - def __provider_thread__(): - for tensors in func(): - array = core.LoDTensorArray() - for item in tensors: - if not isinstance(item, core.LoDTensor): - tmp = core.LoDTensor() - tmp.set(item, core.CPUPlace()) - item = tmp - - array.append(item) - - if reader.exited: - break - feed_queue.push(array) - if reader.exited: - break - feed_queue.close() - - reader.thread = threading.Thread(target=__provider_thread__) - reader.thread.daemon = True - reader.thread.start() - - def __set_tensor_provider__(func): - reader.tensor_provider = func - - def __set_paddle_reader__(paddle_reader): - with program_guard(Program(), Program()): - actual_feed_list = feed_list - if actual_feed_list is None: - actual_feed_list = [] - counter = 0 - for dtype, shape, lod_level in zip(dtypes, shapes, lod_levels): - name = str(counter) - actual_feed_list.append( - data( - name=name, - dtype=dtype, - shape=shape, - lod_level=lod_level)) - counter += 1 - - feeder = DataFeeder( - feed_list=actual_feed_list, place=core.CPUPlace()) - paddle_reader = feeder.decorate_reader( - paddle_reader, multi_devices=False) - - def __tensor_provider__(): - for slots in paddle_reader(): - yield [slots[str(idx)] for idx in six.moves.xrange(counter)] - - __set_tensor_provider__(__tensor_provider__) - - def __reset__(): - current_reset_method() - if reader.thread is not None and reader.tensor_provider is not None: - reader.exited = True - reader.thread.join() - reader.exited = False - - def __start__(): - start_provide_thread(reader.tensor_provider) - - reader.reset = __reset__ - reader.decorate_tensor_provider = __set_tensor_provider__ - reader.decorate_paddle_reader = __set_paddle_reader__ - reader.start = __start__ - - return reader - - def py_reader(capacity, shapes, dtypes, @@ -747,72 +594,128 @@ def py_reader(capacity, >>> except fluid.core.EOFException: >>> test_reader.reset() """ - return _py_reader( - capacity=capacity, - shapes=shapes, - dtypes=dtypes, - lod_levels=lod_levels, - name=name, - use_double_buffer=use_double_buffer) + dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes] + shape_concat = [] + ranks = [] + for shape in shapes: + shape_concat.extend(shape) + ranks.append(len(shape)) -def create_py_reader_by_data(capacity, - feed_list, - name=None, - use_double_buffer=True): - """ - Create a Python reader for data feeding in Python + if lod_levels is None: + lod_levels = [0] * len(shapes) - This layer returns a Reader Variable. + if name is None: + queue_name = unique_name('lod_tensor_blocking_queue') + reader_name = unique_name('create_py_reader') + double_buffer_name = unique_name('double_buffer') + else: + queue_name = "_".join([name, "queue"]) + reader_name = "_".join([name, "reader"]) + double_buffer_name = "_".join([name, "double_buffer"]) - Works much like py_reader except that it's input is feed_list - instead of shapes, dtypes and lod_levels + var = global_scope().var(queue_name) + feed_queue = core.init_lod_tensor_blocking_queue(var, capacity, shapes) - Args: - capacity(int): The buffer capacity maintained by :code:`py_reader`. - feed_list(list(Variable)): The data feed list. - name(basestring): The prefix Python queue name and Reader name. None will - be generated automatically. - use_double_buffer(bool): Whether use double buffer or not. + startup_blk = default_startup_program().current_block() + startup_var = startup_blk.create_var(name=reader_name) + startup_blk.append_op( + type='create_py_reader', + inputs={'blocking_queue': [queue_name]}, + outputs={'Out': [startup_var]}, + attrs={ + 'shape_concat': shape_concat, + 'lod_levels': lod_levels, + 'ranks': ranks + }) - Returns: - Variable: A Reader from which we can get feeding data. + startup_var.desc.set_dtypes(dtypes) + startup_var.persistable = True - Examples: + main_prog_var = _copy_reader_var_(default_main_program().current_block(), + startup_var) - 1. The basic usage of :code:`py_reader` is as follows: + reader = monkey_patch_reader_methods(main_prog_var) + if use_double_buffer: + double_buffer_reader = double_buffer(reader, name=double_buffer_name) + # we return a double buffer reader. However, the reset method comes from + # py_reader. + double_buffer_reader.reset = reader.reset + reader = double_buffer_reader - >>> import paddle.fluid as fluid - >>> import paddle.dataset.mnist as mnist - >>> - >>> image = fluid.layers.data(name='image', shape=[3,224,224], dtypes='float32') - >>> label = fluid.layers.data(name='label', shape=[1], dtypes='int64') - >>> reader = fluid.layers.create_py_reader_by_data(capacity=64, feed_list=[image, label]) - >>> reader.decorate_paddle_reader( - >>> paddle.reader.shuffle(paddle.batch(mnist.train()) - >>> - >>> img, label = fluid.layers.read_file(reader) - >>> loss = network(img, label) # some network definition - >>> - >>> fluid.Executor(fluid.CUDAPlace(0)).run(fluid.default_startup_program()) - >>> - >>> exe = fluid.ParallelExecutor(use_cuda=True, loss_name=loss.name) - >>> for epoch_id in range(10): - >>> reader.start() - >>> try: - >>> while True: - >>> exe.run(fetch_list=[loss.name]) - >>> except fluid.core.EOFException: - >>> reader.reset() - """ - return _py_reader( - capacity=capacity, - shapes=None, - dtypes=None, - lod_levels=None, - name=name, - use_double_buffer=use_double_buffer, - feed_list=feed_list) + # monkey patch py_reader special methods + reader.queue = feed_queue + current_reset_method = reader.reset + reader.thread = None + reader.tensor_provider = None + reader.exited = False + + def start_provide_thread(func): + def __provider_thread__(): + for tensors in func(): + array = core.LoDTensorArray() + for item in tensors: + if not isinstance(item, core.LoDTensor): + tmp = core.LoDTensor() + tmp.set(item, core.CPUPlace()) + item = tmp + + array.append(item) + + if reader.exited: + break + feed_queue.push(array) + if reader.exited: + break + feed_queue.close() + + reader.thread = threading.Thread(target=__provider_thread__) + reader.thread.daemon = True + reader.thread.start() + + def __set_tensor_provider__(func): + reader.tensor_provider = func + + def __set_paddle_reader__(paddle_reader): + with program_guard(Program(), Program()): + feed_list = [] + counter = 0 + for dtype, shape, lod_level in zip(dtypes, shapes, lod_levels): + name = str(counter) + feed_list.append( + data( + name=name, + dtype=dtype, + shape=shape, + lod_level=lod_level)) + counter += 1 + + feeder = DataFeeder(feed_list=feed_list, place=core.CPUPlace()) + paddle_reader = feeder.decorate_reader( + paddle_reader, multi_devices=False) + + def __tensor_provider__(): + for slots in paddle_reader(): + yield [slots[str(idx)] for idx in six.moves.xrange(counter)] + + __set_tensor_provider__(__tensor_provider__) + + def __reset__(): + current_reset_method() + if reader.thread is not None and reader.tensor_provider is not None: + reader.exited = True + reader.thread.join() + reader.exited = False + + def __start__(): + start_provide_thread(reader.tensor_provider) + + reader.reset = __reset__ + reader.decorate_tensor_provider = __set_tensor_provider__ + reader.decorate_paddle_reader = __set_paddle_reader__ + reader.start = __start__ + + return reader def open_files(filenames, diff --git a/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py b/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py index b85b94c939..b7fad9b3a6 100644 --- a/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py +++ b/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py @@ -53,22 +53,13 @@ def simple_fc_net(in_size, hidden_sizes, batch_size, queue_capacity, - use_double_buffer=False, - use_feed_list=True): - if use_feed_list: - data = fluid.layers.data(name="data", dtype='float32', shape=[in_size]) - label = fluid.layers.data(name='label', dtype='int64', shape=[1]) - reader = fluid.layers.create_py_reader_by_data( - capacity=queue_capacity, - use_double_buffer=False, - feed_list=[data, label]) - else: - reader = fluid.layers.py_reader( - capacity=queue_capacity, - shapes=[[-1, in_size], [-1, 1]], - lod_levels=[0, 0], - dtypes=['float32', 'int64'], - use_double_buffer=False) + use_double_buffer=False): + reader = fluid.layers.py_reader( + capacity=queue_capacity, + shapes=[[-1, in_size], [-1, 1]], + lod_levels=[0, 0], + dtypes=['float32', 'int64'], + use_double_buffer=False) feed_queue = reader.queue reader = fluid.layers.batch(reader, batch_size=batch_size) if use_double_buffer: @@ -109,16 +100,14 @@ class TestPyReaderUsingExecutor(unittest.TestCase): if core.is_compiled_with_cuda() else [False]): for use_parallel_executor in [False, True]: for use_double_buffer in [False, True]: - for use_feed_list in [False, True]: - print('Test Parameters:'), - print({ - 'use_cuda': use_cuda, - 'use_parallel_executor': use_parallel_executor, - 'use_double_buffer': use_double_buffer, - 'use_feed_list': use_feed_list - }) - self.main(use_cuda, use_parallel_executor, - use_double_buffer, use_feed_list) + print('Test Parameters:'), + print({ + 'use_cuda': use_cuda, + 'use_parallel_executor': use_parallel_executor, + 'use_double_buffer': use_double_buffer + }) + self.main(use_cuda, use_parallel_executor, + use_double_buffer) def random_reader(self): def reader(): @@ -154,14 +143,12 @@ class TestPyReaderUsingExecutor(unittest.TestCase): def main(self, use_cuda=True, use_parallel_executor=False, - use_double_buffer=False, - use_feed_list=False): + use_double_buffer=False): assert not use_cuda or use_cuda and core.is_compiled_with_cuda() self.use_cuda = use_cuda self.use_parallel_executor = use_parallel_executor self.use_double_buffer = use_double_buffer - self.use_feed_list = use_feed_list startup_program = fluid.Program() main_program = fluid.Program() @@ -173,8 +160,7 @@ class TestPyReaderUsingExecutor(unittest.TestCase): hidden_sizes=self.hidden_sizes, batch_size=self.batch_size, queue_capacity=self.queue_capacity, - use_double_buffer=self.use_double_buffer, - use_feed_list=self.use_feed_list) + use_double_buffer=self.use_double_buffer) place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() -- GitLab