From a0b91c7b3d58c0b20fb37a60b090fccb953d37f0 Mon Sep 17 00:00:00 2001 From: Roc <30228238+sljlp@users.noreply.github.com> Date: Fri, 2 Dec 2022 11:19:32 +0800 Subject: [PATCH] [Clean Fluid]Remove py_reader/double_buffer/create_py_reader_by_data/load from fluid.layer.io (#48589) rm py_reader/double_buffer/create_py_reader_by_data/load rm test_load_xpu --- python/paddle/fluid/layers/io.py | 543 ------------------ python/paddle/fluid/reader.py | 16 +- .../fluid/tests/unittests/CMakeLists.txt | 1 - .../fluid/tests/unittests/test_load_op.py | 73 --- .../fluid/tests/unittests/test_load_op_xpu.py | 73 --- .../fluid/tests/unittests/test_program.py | 65 --- .../unittests/test_py_reader_error_msg.py | 60 -- .../test_py_reader_lod_level_share.py | 45 -- .../unittests/test_py_reader_pin_memory.py | 138 ----- .../unittests/test_py_reader_push_pop.py | 111 ---- .../test_py_reader_using_executor.py | 338 ----------- 11 files changed, 5 insertions(+), 1458 deletions(-) delete mode 100644 python/paddle/fluid/tests/unittests/test_load_op.py delete mode 100644 python/paddle/fluid/tests/unittests/test_load_op_xpu.py delete mode 100644 python/paddle/fluid/tests/unittests/test_py_reader_error_msg.py delete mode 100644 python/paddle/fluid/tests/unittests/test_py_reader_lod_level_share.py delete mode 100644 python/paddle/fluid/tests/unittests/test_py_reader_pin_memory.py delete mode 100644 python/paddle/fluid/tests/unittests/test_py_reader_push_pop.py delete mode 100644 python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py diff --git a/python/paddle/fluid/layers/io.py b/python/paddle/fluid/layers/io.py index 2259044e14e..e33cf23e6c1 100644 --- a/python/paddle/fluid/layers/io.py +++ b/python/paddle/fluid/layers/io.py @@ -43,11 +43,6 @@ from ..framework import ( __all__ = [ 'data', - 'read_file', - 'double_buffer', - 'py_reader', - 'create_py_reader_by_data', - 'load', ] @@ -408,441 +403,6 @@ def _copy_reader_create_op_(block, op): return new_op -def _py_reader( - capacity, - shapes, - dtypes, - lod_levels=None, - name=None, - use_double_buffer=True, - feed_list=None, -): - if feed_list is not None: - if not isinstance(feed_list, list): - raise TypeError( - "feed_list should be a list of Variable" - " instead of " + str(type(feed_list)) - ) - lod_levels = [] - dtypes = [] - shape_concat = [] - ranks = [] - shapes = [] - need_check_feed = [] - - for feed_data in feed_list: - dtypes.append(feed_data.dtype) - shape_concat.extend(feed_data.shape) - ranks.append(len(feed_data.shape)) - shapes.append(feed_data.shape) - lod_levels.append(feed_data.lod_level) - need_check_feed.append(int(feed_data.desc.need_check_feed())) - else: - dtypes = [convert_np_dtype_to_dtype_(dt) for dt in dtypes] - need_check_feed = [0 for dt in dtypes] - shape_concat = [] - ranks = [] - - for shape in shapes: - shape_concat.extend(shape) - ranks.append(len(shape)) - - if lod_levels is None: - lod_levels = [0] * len(shapes) - dtype_int = [int(t) for t in dtypes] - if name is None: - queue_name = unique_name('lod_tensor_blocking_queue') - reader_name = unique_name('create_py_reader') - double_buffer_name = unique_name('double_buffer') - else: - queue_name = "_".join([name, "queue"]) - reader_name = "_".join([name, "reader"]) - double_buffer_name = "_".join([name, "double_buffer"]) - - var = global_scope().var(queue_name) - feed_queue = core.init_lod_tensor_blocking_queue(var, capacity, False) - - startup_blk = default_startup_program().current_block() - startup_var = startup_blk.create_var(name=reader_name) - startup_blk.append_op( - type='create_py_reader', - inputs={'blocking_queue': [queue_name]}, - outputs={'Out': [startup_var]}, - attrs={ - 'shape_concat': shape_concat, - 'lod_levels': lod_levels, - 'dtypes': dtype_int, - 'need_check_feed': need_check_feed, - 'ranks': ranks, - }, - ) - - startup_var.desc.set_dtypes(dtypes) - startup_var.persistable = True - - main_prog_var = _copy_reader_var_( - default_main_program().current_block(), startup_var - ) - - reader = monkey_patch_reader_methods(main_prog_var) - if use_double_buffer: - double_buffer_reader = double_buffer(reader, name=double_buffer_name) - # we return a double buffer reader. However, the reset method comes from - # py_reader. - double_buffer_reader.reset = reader.reset - reader = double_buffer_reader - - # monkey patch py_reader special methods - reader.queue = feed_queue - current_reset_method = reader.reset - reader.thread = None - reader.tensor_provider = None - reader.exited = False - - def start_provide_thread(func): - def __provider_thread__(legacy_expected_place): - try: - # See _DataLoaderIterSingleProcess._thread_loop() for why set expected place here. - - _set_expected_place(legacy_expected_place) - - for tensors in func(): - array = core.LoDTensorArray() - for item in tensors: - if not isinstance(item, core.LoDTensor): - tmp = core.LoDTensor() - tmp.set(item, core.CPUPlace()) - item = tmp - - array.append(item) - - if reader.exited: - break - feed_queue.push(array) - if reader.exited: - break - feed_queue.close() - except Exception as e: - feed_queue.kill() - logging.warn('Your decorated reader has raised an exception!') - raise e - - reader.thread = threading.Thread( - target=__provider_thread__, args=(_current_expected_place(),) - ) - reader.thread.daemon = True - reader.thread.start() - - def __set_tensor_provider__(func): - reader.tensor_provider = func - - def __set_paddle_reader__(paddle_reader): - with program_guard(Program(), Program()): - actual_feed_list = feed_list - if actual_feed_list is None: - actual_feed_list = [] - counter = 0 - for dtype, shape, lod_level in zip(dtypes, shapes, lod_levels): - name = str(counter) - actual_feed_list.append( - data( - name=name, - dtype=dtype, - shape=shape, - lod_level=lod_level, - ) - ) - counter += 1 - - data_names = [feed_data.name for feed_data in actual_feed_list] - feeder = DataFeeder( - feed_list=actual_feed_list, place=core.CPUPlace() - ) - paddle_reader = feeder.decorate_reader( - paddle_reader, multi_devices=False - ) - - def __tensor_provider__(): - for slots in paddle_reader(): - yield [slots[data_name] for data_name in data_names] - - __set_tensor_provider__(__tensor_provider__) - - def __reset__(): - current_reset_method() - if reader.thread is not None and reader.tensor_provider is not None: - reader.exited = True - reader.thread.join() - reader.exited = False - - def __start__(): - start_provide_thread(reader.tensor_provider) - - reader.reset = __reset__ - reader.decorate_tensor_provider = __set_tensor_provider__ - reader.decorate_paddle_reader = __set_paddle_reader__ - - reader.decorate_batch_generator = __set_tensor_provider__ - reader.decorate_sample_list_generator = __set_paddle_reader__ - reader.start = __start__ - - return reader - - -def py_reader( - capacity, shapes, dtypes, lod_levels=None, name=None, use_double_buffer=True -): - """ - :api_attr: Static Graph - - Create a Python reader for data feeding in Python - - This operator returns a Reader Variable. - The Reader provides :code:`decorate_paddle_reader()` and - :code:`decorate_tensor_provider()` to set a Python generator as the data - source and feed the data from the data source to the Reader Variable. - When :code:`Executor::Run()` is invoked in C++ side, the data from the - generator would be read automatically. Unlike :code:`DataFeeder.feed()`, - the data reading process and :code:`Executor::Run()` process can run in - parallel using :code:`py_reader`. The :code:`start()` method of the Reader - should be called when each pass begins, while the :code:`reset()` method - should be called when the pass ends and :code:`fluid.core.EOFException` raises. - - Note: - :code:`Program.clone()` method cannot clone :code:`py_reader`. You can - refer to :ref:`api_fluid_Program` for more details. - - The :code:`read_file` call needs to be in the program block of :code:`py_reader`. - You can refer to :ref:`api_fluid_layers_read_file` for more details. - - Args: - capacity(int): The buffer capacity maintained by :code:`py_reader`. - shapes(list|tuple): List of tuples which declaring data shapes. shapes[i] - represents the i-th data shape. - dtypes(list|tuple): List of strings which declaring data type. Supported dtype: - bool, float16, float32, float64, int8, int16, int32, int64, uint8. - lod_levels(list|tuple): List of ints which declaring data lod_level. - name(basestring): The default value is None. Normally there is no - need for user to set this property. For more information, please - refer to :ref:`api_guide_Name`. - use_double_buffer(bool): Whether use double buffer or not. The double buffer is - for pre-reading the data of the next batch and copy the data asynchronously - from CPU to GPU. Default is True. - - Returns: - A Reader from which we can get feeding data. - - Return Type: - Variable - - Examples: - 1. The basic usage of :code:`py_reader` is as follows: - - .. code-block:: python - - import paddle - import paddle.fluid as fluid - import paddle.dataset.mnist as mnist - - def network(image, label): - # user defined network, here a softmax regession example - predict = fluid.layers.fc(input=image, size=10, act='softmax') - return fluid.layers.cross_entropy(input=predict, label=label) - - reader = fluid.layers.py_reader(capacity=64, - shapes=[(-1, 1, 28, 28), (-1, 1)], - dtypes=['float32', 'int64']) - reader.decorate_paddle_reader( - paddle.reader.shuffle(paddle.batch(mnist.train(), batch_size=5), - buf_size=1000)) - - img, label = fluid.layers.read_file(reader) - loss = network(img, label) - - fluid.Executor(fluid.CUDAPlace(0)).run(fluid.default_startup_program()) - exe = fluid.ParallelExecutor(use_cuda=True) - for epoch_id in range(10): - reader.start() - try: - while True: - exe.run(fetch_list=[loss.name]) - except fluid.core.EOFException: - reader.reset() - - fluid.io.save_inference_model(dirname='./model', - feeded_var_names=[img.name, label.name], - target_vars=[loss], - executor=fluid.Executor(fluid.CUDAPlace(0))) - - 2. When training and testing are both performed, two different - :code:`py_reader` should be created with different names, e.g.: - - .. code-block:: python - - import paddle - import paddle.fluid as fluid - import paddle.dataset.mnist as mnist - - def network(reader): - img, label = fluid.layers.read_file(reader) - # User defined network. Here a simple regression as example - predict = fluid.layers.fc(input=img, size=10, act='softmax') - loss = fluid.layers.cross_entropy(input=predict, label=label) - return fluid.layers.mean(loss) - - # Create train_main_prog and train_startup_prog - train_main_prog = fluid.Program() - train_startup_prog = fluid.Program() - with fluid.program_guard(train_main_prog, train_startup_prog): - # Use fluid.unique_name.guard() to share parameters with test program - with fluid.unique_name.guard(): - train_reader = fluid.layers.py_reader(capacity=64, - shapes=[(-1, 1, 28, 28), - (-1, 1)], - dtypes=['float32', 'int64'], - name='train_reader') - train_reader.decorate_paddle_reader( - paddle.reader.shuffle(paddle.batch(mnist.train(), batch_size=5), - buf_size=500)) - train_loss = network(train_reader) # some network definition - adam = fluid.optimizer.Adam(learning_rate=0.01) - adam.minimize(train_loss) - - # Create test_main_prog and test_startup_prog - test_main_prog = fluid.Program() - test_startup_prog = fluid.Program() - with fluid.program_guard(test_main_prog, test_startup_prog): - # Use fluid.unique_name.guard() to share parameters with train program - with fluid.unique_name.guard(): - test_reader = fluid.layers.py_reader(capacity=32, - shapes=[(-1, 1, 28, 28), (-1, 1)], - dtypes=['float32', 'int64'], - name='test_reader') - test_reader.decorate_paddle_reader(paddle.batch(mnist.test(), 512)) - test_loss = network(test_reader) - - fluid.Executor(fluid.CUDAPlace(0)).run(train_startup_prog) - fluid.Executor(fluid.CUDAPlace(0)).run(test_startup_prog) - - train_exe = fluid.ParallelExecutor(use_cuda=True, - loss_name=train_loss.name, - main_program=train_main_prog) - test_exe = fluid.ParallelExecutor(use_cuda=True, - loss_name=test_loss.name, - main_program=test_main_prog) - for epoch_id in range(10): - train_reader.start() - try: - while True: - train_exe.run(fetch_list=[train_loss.name]) - except fluid.core.EOFException: - train_reader.reset() - - test_reader.start() - try: - while True: - test_exe.run(fetch_list=[test_loss.name]) - except fluid.core.EOFException: - test_reader.reset() - """ - logging.warn( - 'paddle.fluid.layers.py_reader() may be deprecated in the near future. ' - 'Please use paddle.fluid.io.DataLoader.from_generator() instead.' - ) - return _py_reader( - capacity=capacity, - shapes=shapes, - dtypes=dtypes, - lod_levels=lod_levels, - name=name, - use_double_buffer=use_double_buffer, - ) - - -def create_py_reader_by_data( - capacity, feed_list, name=None, use_double_buffer=True -): - """ - :api_attr: Static Graph - - The OP creates a Python reader for data feeding in Python, it is similar - to :ref:`api_fluid_layers_py_reader` except that it can read data from - the list of feed variables. - - Parameters: - capacity (int): The buffer capacity maintained by :code:`py_reader`. Its unit - is batch number. Set larger :attr:`capacity` if the reader is fast. - feed_list (list(Variable)): The feed variables, are usually created by - :code:`fluid.data()`. - name (str, optional): Normally there is no need for user to set this property. - For more information, please refer to :ref:`api_guide_Name`. Default: None. - use_double_buffer (bool, optional): Whether use double buffer. If it's True, - the OP would prefetch next batch data asynchronously. Default: True. - - Returns: - Reader: A Reader for data feeding. The data types of read data are the same as the data types of variables of :attr:`feed_list`. - - Examples: - .. code-block:: python - - import paddle - import paddle.fluid as fluid - import paddle.dataset.mnist as mnist - - def network(img, label): - # User defined network. Here a simple regression as example - predict = fluid.layers.fc(input=img, size=10, act='softmax') - loss = fluid.layers.cross_entropy(input=predict, label=label) - return fluid.layers.mean(loss) - - MEMORY_OPT = False - USE_CUDA = False - - image = fluid.data(name='image', shape=[None, 1, 28, 28], dtype='float32') - label = fluid.data(name='label', shape=[None, 1], dtype='int64') - reader = fluid.layers.create_py_reader_by_data(capacity=64, - feed_list=[image, label]) - reader.decorate_paddle_reader( - paddle.reader.shuffle(paddle.batch(mnist.train(), batch_size=5), buf_size=500)) - img, label = fluid.layers.read_file(reader) - loss = network(img, label) # The definition of custom network and the loss function - - place = fluid.CUDAPlace(0) if USE_CUDA else fluid.CPUPlace() - exe = fluid.Executor(place) - exe.run(fluid.default_startup_program()) - - build_strategy = fluid.BuildStrategy() - build_strategy.memory_optimize = True if MEMORY_OPT else False - exec_strategy = fluid.ExecutionStrategy() - compiled_prog = fluid.compiler.CompiledProgram( - fluid.default_main_program()).with_data_parallel( - loss_name=loss.name, - build_strategy=build_strategy, - exec_strategy=exec_strategy) - - for epoch_id in range(2): - reader.start() - try: - while True: - exe.run(compiled_prog, fetch_list=[loss.name]) - except fluid.core.EOFException: - reader.reset() - """ - logging.warn( - 'paddle.fluid.layers.create_py_reader_by_data() may be deprecated in the near future. ' - 'Please use paddle.fluid.io.DataLoader.from_generator() instead.' - ) - return _py_reader( - capacity=capacity, - shapes=None, - dtypes=None, - lod_levels=None, - name=name, - use_double_buffer=use_double_buffer, - feed_list=feed_list, - ) - - def __create_shared_decorated_reader__(op_type, reader, attrs): var_name = unique_name(op_type) startup_blk = default_startup_program().current_block() @@ -871,106 +431,3 @@ def __create_unshared_decorated_reader__(op_type, reader, attrs, name=None): attrs=attrs, ) return monkey_patch_reader_methods(new_reader) - - -def double_buffer(reader, place=None, name=None): - """ - Wrap a double buffer reader. The class Reader contains DecoratedReader and FileReader. Moreover, the DecoratedReader is inherited by CustomReader and BufferedReader. This function is related to BufferedReader. The data will copy to target place with a double buffer queue. If the target place is None, the place that executor perform on will be used. - - - Args: - reader (Variable): The Reader Variable need to be wrapped. - place (Place|str, optional): The place of target data, such as CPU, GPU, and if use GPU, it's necessary to point out which card is involved. Default is the sample place of executor perform. - if ``place`` is string, It can be ``cpu``, ``gpu:x``, where ``x`` is the ndex of the GPUs. - name (str, optional): Variable name. Normally there is no need for user to set this property. For more information, please refer to :ref:`api_guide_Name`. Default is None. - - Returns: - Variable(Reader): wrapped reader with double buffer. - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - reader = fluid.layers.py_reader(capacity=64, - shapes=[(-1, 1, 28, 28), (-1, 1)], - dtypes=['float32', 'int64'], - use_double_buffer=False) - reader = fluid.layers.double_buffer(reader) - image, label = fluid.layers.read_file(reader) - """ - attrs = dict() - if place is not None: - attrs['place'] = str(_get_paddle_place(place)).upper() - - return __create_unshared_decorated_reader__( - 'create_double_buffer_reader', reader, attrs, name=name - ) - - -def read_file(reader): - """ - :api_attr: Static Graph - - Execute the given reader and get data via it. - - A reader is also a Variable. It can be a raw reader generated by - `fluid.layers.open_files()` or a decorated one generated by - `fluid.layers.double_buffer()` . - - Args: - - reader(Variable): The reader to execute. - - Returns: - Tuple[Variable]: Data read from the given reader. - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - reader = fluid.layers.py_reader(capacity=64, - shapes=[(-1, 1, 28, 28), (-1, 1)], - dtypes=['float32', 'int64']) - image, label = fluid.layers.read_file(reader) - """ - helper = LayerHelper('read_file') - out = [ - helper.create_variable_for_type_inference( - stop_gradient=True, dtype='float32' - ) - for _ in range(len(reader.desc.shapes())) - ] - helper.append_op( - type='read', inputs={'Reader': [reader]}, outputs={'Out': out} - ) - if len(out) == 1: - return out[0] - else: - return out - - -def load(out, file_path, load_as_fp16=None): - """ - Load operator will load a LoDTensor / SelectedRows variable from disk file. - - Args: - out(Variable): The LoDTensor / SelectedRows need to be loaded.. - - file_path(STRING): Variable will be loaded from "file_path". - - load_as_fp16(BOOLEAN): If true, the tensor will be first loaded and then converted to float16 data type. Otherwise, the tensor will be directly loaded without data type conversion. Default is false.. - Returns: - None - - Examples: - .. code-block:: python - - import paddle.fluid as fluid - tmp_tensor = fluid.layers.create_tensor(dtype='float32') - fluid.layers.load(tmp_tensor, "./tmp_tensor.bin") - """ - helper = LayerHelper("load", **locals()) - attrs = {"file_path": file_path} - if load_as_fp16 is not None: - attrs['load_as_fp16'] = load_as_fp16 - helper.append_op(type="load", inputs={}, outputs={"Out": out}, attrs=attrs) diff --git a/python/paddle/fluid/reader.py b/python/paddle/fluid/reader.py index b6b774d5ba1..4883d70d97d 100644 --- a/python/paddle/fluid/reader.py +++ b/python/paddle/fluid/reader.py @@ -51,7 +51,6 @@ from .dataloader.batch_sampler import _InfiniteIterableSampler from .layers.io import ( monkey_patch_reader_methods, _copy_reader_var_, - double_buffer, ) from .unique_name import UniqueNameGenerator from .framework import _get_paddle_place, _get_paddle_place_list @@ -1352,6 +1351,11 @@ class GeneratorLoader(DataLoaderBase): self._use_double_buffer = use_double_buffer self._capacity = capacity if not self._iterable: + # Because layers.io.double_buffer is not supported anymore, and only when iterable and use_double_buffer + # are both True layers.io.double_buffer will be in use, here if itrable is False, use_double_buffer will be + # forcely set False to avoid using layers.io.double_buffer. + # TODO: keep use_double_buffer + self._use_double_buffer = False self._init_non_iterable() def _wait_thread_ends(self): @@ -1406,7 +1410,6 @@ class GeneratorLoader(DataLoaderBase): 'lod_tensor_blocking_queue' ) reader_name = data_loader_unique_name_generator('create_py_reader') - double_buffer_name = data_loader_unique_name_generator('double_buffer') var = global_scope().var(queue_name) self._queue = core.init_lod_tensor_blocking_queue( @@ -1452,15 +1455,6 @@ class GeneratorLoader(DataLoaderBase): reader = monkey_patch_reader_methods(main_prog_var) - if self._use_double_buffer: - double_buffer_reader = double_buffer( - reader, name=double_buffer_name - ) - # we return a double buffer reader. However, the reset method comes from - # py_reader. - double_buffer_reader.reset = reader.reset - reader = double_buffer_reader - self._reader = reader default_main_program().current_block().append_op( diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 61e9917359b..9fff40e1685 100755 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -1079,7 +1079,6 @@ set_tests_properties(test_nan_inf PROPERTIES TIMEOUT 120) set_tests_properties(test_deformable_conv_v1_op PROPERTIES TIMEOUT 300) set_tests_properties(test_parallel_executor_transformer_auto_growth PROPERTIES TIMEOUT 120) -set_tests_properties(test_py_reader_using_executor PROPERTIES TIMEOUT 120) set_tests_properties(test_elementwise_add_op PROPERTIES TIMEOUT 120) set_tests_properties(test_weight_decay PROPERTIES TIMEOUT 120) set_tests_properties(test_imperative_ptb_rnn_sorted_gradient PROPERTIES TIMEOUT diff --git a/python/paddle/fluid/tests/unittests/test_load_op.py b/python/paddle/fluid/tests/unittests/test_load_op.py deleted file mode 100644 index acf0a810293..00000000000 --- a/python/paddle/fluid/tests/unittests/test_load_op.py +++ /dev/null @@ -1,73 +0,0 @@ -# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import tempfile -import unittest - -import numpy as np - -import paddle -import paddle.fluid as fluid -import paddle.fluid.layers as layers - - -class TestLoadOp(unittest.TestCase): - """Test load operator.""" - - def setUp(self): - self.temp_dir = tempfile.TemporaryDirectory() - self.ones = np.ones((4, 4)).astype('float32') - main_prog = fluid.Program() - start_prog = fluid.Program() - with fluid.program_guard(main_prog, start_prog): - input = fluid.data('input', shape=[-1, 4], dtype='float32') - output = layers.fc( - input, - 4, - param_attr=fluid.ParamAttr( - name='w', - initializer=fluid.initializer.NumpyArrayInitializer( - self.ones - ), - ), - ) - exe = fluid.Executor(fluid.CPUPlace()) - exe.run(start_prog) - paddle.distributed.io.save_persistables( - exe, - dirname=os.path.join(self.temp_dir.name, "./model"), - main_program=main_prog, - ) - - def tearDown(self): - self.temp_dir.cleanup() - - def test_load(self): - main_prog = fluid.Program() - start_prog = fluid.Program() - with fluid.program_guard(main_prog, start_prog): - var = layers.create_tensor(dtype='float32') - layers.load( - var, file_path=os.path.join(self.temp_dir.name, './model/w') - ) - - exe = fluid.Executor(fluid.CPUPlace()) - exe.run(start_prog) - ret = exe.run(main_prog, fetch_list=[var.name]) - np.testing.assert_array_equal(self.ones, ret[0]) - - -if __name__ == "__main__": - unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_load_op_xpu.py b/python/paddle/fluid/tests/unittests/test_load_op_xpu.py deleted file mode 100644 index 3ede3b26914..00000000000 --- a/python/paddle/fluid/tests/unittests/test_load_op_xpu.py +++ /dev/null @@ -1,73 +0,0 @@ -# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import tempfile -import unittest - -import numpy as np - -import paddle -import paddle.fluid as fluid -import paddle.fluid.layers as layers - - -@unittest.skipIf( - not paddle.is_compiled_with_xpu(), "core is not compiled with XPU" -) -class TestLoadOpXpu(unittest.TestCase): - """Test load operator.""" - - def setUp(self): - self.temp_dir = tempfile.TemporaryDirectory() - self.model_path = os.path.join(self.temp_dir.name, "model") - self.ones = np.ones((4, 4)).astype('float32') - main_prog = fluid.Program() - start_prog = fluid.Program() - with fluid.program_guard(main_prog, start_prog): - input = fluid.data('input', shape=[-1, 4], dtype='float32') - output = layers.fc( - input, - 4, - param_attr=fluid.ParamAttr( - name='w', - initializer=fluid.initializer.NumpyArrayInitializer( - self.ones - ), - ), - ) - exe = fluid.Executor(fluid.XPUPlace(0)) - exe.run(start_prog) - paddle.distributed.io.save_persistables( - exe, dirname=self.model_path, main_program=main_prog - ) - - def tearDown(self): - self.temp_dir.cleanup() - - def test_load_xpu(self): - main_prog = fluid.Program() - start_prog = fluid.Program() - with fluid.program_guard(main_prog, start_prog): - var = layers.create_tensor(dtype='float32') - layers.load(var, file_path=self.model_path + '/w') - - exe = fluid.Executor(fluid.XPUPlace(0)) - exe.run(start_prog) - ret = exe.run(main_prog, fetch_list=[var.name]) - np.testing.assert_array_equal(self.ones, ret[0]) - - -if __name__ == "__main__": - unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_program.py b/python/paddle/fluid/tests/unittests/test_program.py index 1b38cf4f5fa..54320aee59b 100644 --- a/python/paddle/fluid/tests/unittests/test_program.py +++ b/python/paddle/fluid/tests/unittests/test_program.py @@ -105,41 +105,6 @@ class TestProgram(unittest.TestCase): new_program = main_program.clone() self.assertNotEqual(0, len(new_program.blocks[0].all_parameters())) - def test_program_inference_optimize(self): - def net(): - reader = fluid.layers.py_reader( - capacity=10, - shapes=[[-1, 10], [-1, 1]], - lod_levels=[0, 0], - dtypes=['float32', 'int64'], - use_double_buffer=True, - ) - in_data, label = fluid.layers.read_file(reader) - predict_label = fluid.layers.fc(in_data, size=2, act='softmax') - loss = paddle.mean( - fluid.layers.cross_entropy(input=predict_label, label=label) - ) - - optimizer = fluid.optimizer.Adam() - optimizer.minimize(loss) - - startup_program = fluid.Program() - main_program = fluid.Program() - with fluid.program_guard(main_program, startup_program): - net() - no_read_program = main_program._inference_optimize() - keep_read_program = main_program._inference_optimize( - prune_read_op=False - ) - no_read_ops = no_read_program.global_block().ops - keep_read_ops = keep_read_program.global_block().ops - self.assertEqual(len(keep_read_ops) - len(no_read_ops), 2) - self.assertEqual(keep_read_ops[0].type, 'create_double_buffer_reader') - self.assertEqual(keep_read_ops[1].type, 'read') - - for i in range(len(no_read_ops)): - self.assertEqual(no_read_ops[i].type, keep_read_ops[i + 2].type) - def test_program_all_parameters(self): program = fluid.default_main_program() data = fluid.data(name='x', shape=[None, 13], dtype='float32') @@ -172,36 +137,6 @@ class TestProgram(unittest.TestCase): TypeError, program._copy_dist_param_info_from, "program" ) - def test_remove_training_info(self): - def net(): - reader = fluid.layers.py_reader( - capacity=10, - shapes=[[-1, 10], [-1, 1]], - lod_levels=[0, 0], - dtypes=['float32', 'int64'], - use_double_buffer=True, - ) - in_data, label = fluid.layers.read_file(reader) - predict_label = fluid.layers.fc(in_data, size=2, act='softmax') - loss = paddle.mean( - fluid.layers.cross_entropy(input=predict_label, label=label) - ) - - optimizer = fluid.optimizer.Adam() - optimizer.minimize(loss) - - main_program = fluid.Program() - with fluid.program_guard(main_program): - net() - - removed_program = main_program._remove_training_info() - - for i in range(removed_program.num_blocks): - block = removed_program.block(i) - for var in block.desc.all_vars(): - self.assertFalse(var.has_is_parameter()) - self.assertFalse(var.has_stop_gradient()) - def build_program(): main_program = paddle.static.Program() diff --git a/python/paddle/fluid/tests/unittests/test_py_reader_error_msg.py b/python/paddle/fluid/tests/unittests/test_py_reader_error_msg.py deleted file mode 100644 index 0a198647dcc..00000000000 --- a/python/paddle/fluid/tests/unittests/test_py_reader_error_msg.py +++ /dev/null @@ -1,60 +0,0 @@ -# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest - -import numpy as np - -import paddle -import paddle.fluid as fluid - - -class TestPyReaderErrorMsg(unittest.TestCase): - def test_check_input_array(self): - fluid.reader.GeneratorLoader._check_input_array( - [ - np.random.randint(100, size=[2]), - np.random.randint(100, size=[2]), - np.random.randint(100, size=[2]), - ] - ) - self.assertRaises( - TypeError, - fluid.reader.GeneratorLoader._check_input_array, - [ - np.random.randint(100, size=[2]), - np.random.randint(100, size=[1]), - np.random.randint(100, size=[3]), - ], - ) - - -class TestDoubleBufferAPI(unittest.TestCase): - def test_double_buffer(self): - paddle.enable_static() - if fluid.core.is_compiled_with_cuda(): - reader = fluid.layers.py_reader( - capacity=64, - shapes=[(-1, 1, 28, 28), (-1, 1)], - dtypes=['float32', 'int64'], - use_double_buffer=False, - ) - reader = fluid.layers.double_buffer( - reader, place=fluid.core.CUDAPlace(0) - ) - image, label = fluid.layers.read_file(reader) - - -if __name__ == '__main__': - unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_py_reader_lod_level_share.py b/python/paddle/fluid/tests/unittests/test_py_reader_lod_level_share.py deleted file mode 100644 index c7fb6a8df59..00000000000 --- a/python/paddle/fluid/tests/unittests/test_py_reader_lod_level_share.py +++ /dev/null @@ -1,45 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest - -import paddle.fluid as fluid - - -class TestLoDLevelShare(unittest.TestCase): - def setUp(self): - self.use_double_buffer = False - - def test_lod_level_share(self): - reader = fluid.layers.py_reader( - capacity=16, - shapes=([-1, 256], [-1, 512], [-1, 100]), - dtypes=('float32', 'int64', 'double'), - lod_levels=(1, 2, 0), - use_double_buffer=self.use_double_buffer, - ) - - x, y, z = fluid.layers.read_file(reader) - self.assertEqual(x.lod_level, 1) - self.assertEqual(y.lod_level, 2) - self.assertEqual(z.lod_level, 0) - - -class TestLoDLevelShare2(TestLoDLevelShare): - def setUp(self): - self.use_double_buffer = True - - -if __name__ == '__main__': - unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_py_reader_pin_memory.py b/python/paddle/fluid/tests/unittests/test_py_reader_pin_memory.py deleted file mode 100644 index 509d5f65292..00000000000 --- a/python/paddle/fluid/tests/unittests/test_py_reader_pin_memory.py +++ /dev/null @@ -1,138 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest - -import numpy as np - -import paddle -import paddle.fluid as fluid -import paddle.fluid.core as core - - -def user_reader(inputs): - def _reader(): - for d in inputs: - yield d - - return _reader - - -def batch_feeder(batch_reader, pin_memory=False, img_dtype="float32"): - def _feeder(): - for batch_data in batch_reader(): - sample_batch = [] - label_batch = [] - for sample, label in batch_data: - sample_batch.append(sample) - label_batch.append([label]) - tensor = core.LoDTensor() - label = core.LoDTensor() - place = core.CUDAPinnedPlace() if pin_memory else core.CPUPlace() - tensor.set(np.array(sample_batch, dtype=img_dtype), place) - label.set(np.array(label_batch, dtype="int64"), place) - yield [tensor, label] - - return _feeder - - -class TestPyReader(unittest.TestCase): - def setUp(self): - self.capacity = 10 - self.shapes = [(-1, 3, 2, 1), (-1, 1)] - self.lod_levels = [0, 0] - self.dtypes = ['float32', 'int64'] - - def test_pin_memory_pyreader(self): - with fluid.program_guard(fluid.Program(), fluid.Program()): - place = ( - fluid.CUDAPlace(0) - if fluid.core.is_compiled_with_cuda() - else fluid.CPUPlace() - ) - executor = fluid.Executor(place) - - data_file = fluid.layers.py_reader( - capacity=self.capacity, - dtypes=self.dtypes, - lod_levels=self.lod_levels, - shapes=self.shapes, - ) - # feed_queue = data_file.queue - read_out_data = fluid.layers.read_file(data_file) - - self.inputs = [] - for _ in range(10): - sample = np.random.uniform( - low=0, high=1, size=[3, 2, 1] - ).astype("float32") - label = np.random.randint(low=0, high=10, dtype="int64") - self.inputs.append((sample, label)) - - self.input_tensors = [] - for d, l in batch_feeder( - paddle.batch(user_reader(self.inputs), batch_size=2), - pin_memory=True - if fluid.core.is_compiled_with_cuda() - else False, - )(): - ta = fluid.LoDTensorArray() - ta.append(d) - ta.append(l) - self.input_tensors.append(ta) - - self.batched_inputs = [] - for batch in paddle.batch(user_reader(self.inputs), batch_size=2)(): - feed_d = [] - feed_l = [] - for d, l in batch: - feed_d.append(d) - feed_l.append([l]) - self.batched_inputs.append([feed_d, feed_l]) - - data_file.decorate_tensor_provider( - batch_feeder( - paddle.batch(user_reader(self.inputs), batch_size=2), - pin_memory=True - if fluid.core.is_compiled_with_cuda() - else False, - ) - ) - - executor.run(fluid.default_startup_program()) - self.outputs = [] - - data_file.start() - for _ in self.input_tensors: - self.outputs.append( - executor.run(fetch_list=list(read_out_data)) - ) - data_file.reset() - self.validate() - - def validate(self): - self.assertEqual(len(self.batched_inputs), len(self.outputs)) - for in_data_list, out_data_list in zip( - self.batched_inputs, self.outputs - ): - self.assertEqual(len(in_data_list), len(out_data_list)) - in_data_list_np = [ - np.array(in_lod_tensor) for in_lod_tensor in in_data_list - ] - for in_data, out_data in zip(in_data_list_np, out_data_list): - self.assertTrue((in_data == out_data).all()) - - -if __name__ == '__main__': - unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_py_reader_push_pop.py b/python/paddle/fluid/tests/unittests/test_py_reader_push_pop.py deleted file mode 100644 index 7f2dc7817c8..00000000000 --- a/python/paddle/fluid/tests/unittests/test_py_reader_push_pop.py +++ /dev/null @@ -1,111 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest -from threading import Thread - -import numpy as np - -import paddle.fluid as fluid - - -def feed_data(feed_queue, inputs): - for in_data in inputs: - feed_queue.push(in_data) - - -class TestPyReader(unittest.TestCase): - def setUp(self): - self.capacity = 10 - self.batch_size_min = 10 - self.batch_size_max = 20 - self.shapes = [(-1, 3, 2, 1), (-1, 1)] - self.lod_levels = [0, 0] - self.dtypes = ['float32', 'int64'] - self.iterations = 20 - - def test_single_thread_main(self): - self.main(use_thread=False) - - def test_multiple_thread_main(self): - self.main(use_thread=True) - - def main(self, use_thread=False): - with fluid.program_guard(fluid.Program(), fluid.Program()): - place = ( - fluid.CUDAPlace(0) - if fluid.core.is_compiled_with_cuda() - else fluid.CPUPlace() - ) - executor = fluid.Executor(place) - - data_file = fluid.layers.py_reader( - capacity=self.capacity, - dtypes=self.dtypes, - lod_levels=self.lod_levels, - shapes=self.shapes, - ) - feed_queue = data_file.queue - read_out_data = fluid.layers.read_file(data_file) - self.inputs = [] - - for i in range(self.iterations): - in_data = fluid.LoDTensorArray() - batch_size = np.random.random_integers( - self.batch_size_min, self.batch_size_max - ) - for shape, dtype in zip(self.shapes, self.dtypes): - next_data = np.random.uniform( - low=0, high=1000, size=(batch_size,) + shape[1:] - ).astype(dtype) - in_data.append( - fluid.executor._as_lodtensor(next_data, place) - ) - - self.inputs.append(in_data) - - executor.run(fluid.default_startup_program()) - self.outputs = [] - if use_thread: - thread = Thread( - target=feed_data, args=(feed_queue, self.inputs) - ) - thread.start() - for in_data in self.inputs: - self.outputs.append( - executor.run(fetch_list=list(read_out_data)) - ) - else: - for in_data in self.inputs: - feed_queue.push(in_data) - self.outputs.append( - executor.run(fetch_list=list(read_out_data)) - ) - - feed_queue.close() - self.validate() - - def validate(self): - self.assertEqual(len(self.inputs), len(self.outputs)) - for in_data_list, out_data_list in zip(self.inputs, self.outputs): - self.assertEqual(len(in_data_list), len(out_data_list)) - in_data_list_np = [ - np.array(in_lod_tensor) for in_lod_tensor in in_data_list - ] - for in_data, out_data in zip(in_data_list_np, out_data_list): - self.assertTrue((in_data == out_data).all()) - - -if __name__ == '__main__': - unittest.main() diff --git a/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py b/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py deleted file mode 100644 index 01ab760b6e2..00000000000 --- a/python/paddle/fluid/tests/unittests/test_py_reader_using_executor.py +++ /dev/null @@ -1,338 +0,0 @@ -# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import multiprocessing -import os -import threading -import unittest - -import numpy as np - -import paddle -import paddle.fluid as fluid -import paddle.fluid.core as core -import paddle.fluid.unique_name as unique_name -from paddle.fluid import compiler - -os.environ['CPU_NUM'] = str(4) - - -def as_tensor(np_array_or_tensor, place=None): - if isinstance(np_array_or_tensor, fluid.LoDTensor): - return np_array_or_tensor - - if place is None: - place = fluid.CPUPlace() - - tensor = fluid.LoDTensor() - tensor.set(np_array_or_tensor, place) - return tensor - - -def as_numpy(tensor_or_numpy): - return ( - tensor_or_numpy - if isinstance(tensor_or_numpy, np.ndarray) - else np.array(tensor_or_numpy) - ) - - -def sample_list_to_tensor_array(sample_list): - slot_num = None - slots = None - for sample in sample_list: - if slot_num is None: - slot_num = len(sample) - slots = [None] * len(sample) - else: - assert slot_num == len(sample) - - for slot_id, slot_item in enumerate(sample): - if slots[slot_id] is None: - slots[slot_id] = [] - slots[slot_id].append(slot_item) - - tensor_array = fluid.LoDTensorArray() - for slot in slots: - t = fluid.LoDTensor() - t.set(np.array(slot), fluid.CPUPlace()) - tensor_array.append(t) - - return tensor_array - - -def feed_data(feed_queue, batch_reader): - data_generator = batch_reader() - while True: - data = next(data_generator, None) - if data is None or (len(data) == 1 and data[0] is None): - break - - if not feed_queue.push(sample_list_to_tensor_array(data)): - break - - feed_queue.close() - - -def simple_fc_net( - in_size, - class_num, - hidden_sizes, - batch_size, - queue_capacity, - use_double_buffer=False, - use_feed_list=True, -): - in_data = fluid.layers.data(name="data", dtype='float32', shape=[in_size]) - label = fluid.layers.data(name='label', dtype='int64', shape=[1]) - if use_feed_list: - py_reader = fluid.layers.create_py_reader_by_data( - capacity=queue_capacity, - use_double_buffer=use_double_buffer, - feed_list=[in_data, label], - name=unique_name.generate('py_reader_name'), - ) - else: - py_reader = fluid.layers.py_reader( - capacity=queue_capacity, - shapes=[in_data.shape, label.shape], - dtypes=['float32', 'int64'], - name=unique_name.generate('py_reader_name'), - use_double_buffer=use_double_buffer, - ) - - in_data, label = fluid.layers.read_file(py_reader) - - feed_queue = py_reader.queue - - hidden = in_data - for hidden_size in hidden_sizes: - hidden = fluid.layers.fc( - hidden, - size=hidden_size, - act='tanh', - bias_attr=fluid.ParamAttr( - initializer=fluid.initializer.Constant(value=1.0) - ), - ) - - predict_label = fluid.layers.fc(hidden, size=class_num, act='softmax') - loss = paddle.mean( - fluid.layers.cross_entropy(input=predict_label, label=label) - ) - - optimizer = fluid.optimizer.Adam() - optimizer.minimize(loss) - return in_data, label, loss, optimizer, feed_queue, py_reader - - -class TestPyReaderUsingExecutor(unittest.TestCase): - def setUp(self): - self.in_size = 1000 - self.hidden_sizes = [50, 30, 20] - self.class_num = 10 - self.batch_size = 32 - self.iterations = 10 - self.queue_capacity = 50 - - def test(self): - for use_cuda in ( - [False, True] if core.is_compiled_with_cuda() else [False] - ): - for use_parallel_executor in [False, True]: - for use_double_buffer in [False, True]: - for use_feed_list in [False, True]: - for use_decorate_paddle_reader in [False, True]: - print('Test Parameters:'), - print( - { - 'use_cuda': use_cuda, - 'use_parallel_executor': use_parallel_executor, - 'use_double_buffer': use_double_buffer, - 'use_feed_list': use_feed_list, - 'use_decorate_paddle_reader': use_decorate_paddle_reader, - } - ) - self.main( - use_cuda, - use_parallel_executor, - use_double_buffer, - use_feed_list, - use_decorate_paddle_reader, - ) - - def tensor_reader(self, use_decorate_paddle_reader): - def reader(): - for sample_id in range( - self.batch_size * self.iterations * self.batch_size_times - ): - in_data = np.random.uniform( - low=0, high=1, size=(self.in_size,) - ).astype('float32') - label = np.random.random_integers( - low=0, high=self.class_num - 1, size=(1,) - ).astype('int64') - - reshaped_in_data = np.reshape(in_data, [1, -1]) - reshaped_label = np.reshape(label, [1, -1]) - if sample_id % (self.batch_size * self.batch_size_times) == 0: - self.inputs.append([reshaped_in_data, reshaped_label]) - else: - self.inputs[-1][0] = np.concatenate( - (self.inputs[-1][0], reshaped_in_data), axis=0 - ) - self.inputs[-1][1] = np.concatenate( - (self.inputs[-1][1], reshaped_label), axis=0 - ) - - yield in_data, label - - if not use_decorate_paddle_reader: - yield None - - return reader - - def main( - self, - use_cuda=True, - use_parallel_executor=False, - use_double_buffer=False, - use_feed_list=False, - use_decorate_paddle_reader=False, - ): - assert not use_cuda or use_cuda and core.is_compiled_with_cuda() - - self.use_cuda = use_cuda - self.use_parallel_executor = use_parallel_executor - self.use_double_buffer = use_double_buffer - self.use_feed_list = use_feed_list - self.use_decorate_paddle_reader = use_decorate_paddle_reader - - startup_program = fluid.Program() - main_program = fluid.Program() - - with fluid.program_guard(main_program, startup_program): - ( - in_data, - label, - loss, - optimizer, - feed_queue, - py_reader, - ) = simple_fc_net( - in_size=self.in_size, - class_num=self.class_num, - hidden_sizes=self.hidden_sizes, - batch_size=self.batch_size, - queue_capacity=self.queue_capacity, - use_double_buffer=self.use_double_buffer, - use_feed_list=self.use_feed_list, - ) - - place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() - - exe = fluid.Executor(place) - exe.run(startup_program) - - train_cp = main_program - if use_parallel_executor: - train_cp = compiler.CompiledProgram( - main_program - ).with_data_parallel(loss_name=loss.name) - if use_cuda: - self.batch_size_times = core.get_cuda_device_count() - else: - self.batch_size_times = int( - os.environ.get('CPU_NUM', multiprocessing.cpu_count()) - ) - else: - self.batch_size_times = 1 - - reader = self.tensor_reader(use_decorate_paddle_reader) - batch_reader = paddle.batch(reader, batch_size=self.batch_size) - - self.inputs = [] - self.outputs = [] - - if use_decorate_paddle_reader: - if use_feed_list: - py_reader.decorate_paddle_reader(batch_reader) - else: - py_reader.decorate_sample_list_generator(batch_reader) - py_reader.start() - else: - thread = threading.Thread( - target=feed_data, args=(feed_queue, batch_reader) - ) - thread.daemon = True - thread.start() - - try: - while True: - fetches = exe.run( - train_cp, fetch_list=[in_data.name, label.name] - ) - fetches = [as_numpy(fetch) for fetch in fetches] - self.outputs.append(fetches) - except fluid.core.EOFException: - pass - - feed_queue.close() - self.validate() - if use_decorate_paddle_reader: - py_reader.exited = True - py_reader.thread.join() - else: - thread.join() - - def validate(self): - if not self.use_double_buffer: - self.assertEqual(len(self.inputs), len(self.outputs)) - else: - self.assertTrue(len(self.inputs) >= len(self.outputs)) - for idx in range(len(self.outputs)): - batch_in = self.inputs[idx] - batch_out = self.outputs[idx] - self.assertEqual(len(batch_in), len(batch_out)) - if self.use_parallel_executor and not self.use_double_buffer: - self.validate_unordered_batch(batch_in, batch_out) - else: - for in_data, out_data in zip(batch_in, batch_out): - self.assertEqual(in_data.shape, out_data.shape) - if not self.use_parallel_executor: - self.assertTrue((in_data == out_data).all()) - - def validate_unordered_batch(self, batch_in, batch_out): - out_index_left_set = set(range(self.batch_size * self.batch_size_times)) - mapping_num = 0 - for i in range(self.batch_size * self.batch_size_times): - for j in out_index_left_set: - flag = True - for k in range(len(batch_in)): - in_data = batch_in[k][i] - out_data = batch_out[k][j] - if (in_data != out_data).any(): - flag = False - break - - if flag: - out_index_left_set.remove(j) - mapping_num += 1 - break - - self.assertEqual(mapping_num, self.batch_size * self.batch_size_times) - - -if __name__ == '__main__': - unittest.main() -- GitLab