diff --git a/python/paddle/fluid/dataloader/dataloader_iter.py b/python/paddle/fluid/dataloader/dataloader_iter.py index ac6e05248f72d5a0499138586b25f6f35c4822af..3c801f6de89e3eb1365fc371264a10c0e8cd677b 100644 --- a/python/paddle/fluid/dataloader/dataloader_iter.py +++ b/python/paddle/fluid/dataloader/dataloader_iter.py @@ -38,7 +38,27 @@ from ..multiprocess_utils import CleanupFuncRegistrar, _cleanup_mmap, _set_SIGCH MP_INDICES_CHECK_INTERVAL = 5 -def _default_collate_fn(batch): +def default_collate_fn(batch): + """ + Default batch collating function for :code:`fluid.io.DataLoader`, + batch should be a list of samples, and each sample should be a list + of fields as follows: + + [[field1, field2, ...], [field1, field2, ...], ...] + + This default collate function zipped each field together and stack + each field as the batch field as follows: + + [batch_field1, batch_field2, ...] + + Args: + batch(list of list of numpy array): the batch data, each fields + should be a numpy array, each sample should be a list of + fields, and batch should be a list of sample. + + Returns: + a list of numpy array: collated batch + """ sample = batch[0] # dataset has only 1 field if isinstance(sample, np.ndarray): @@ -82,7 +102,7 @@ class _DataLoaderIterBase(object): self._return_list = loader.return_list self._batch_sampler = loader.batch_sampler self._sampler_iter = iter(loader.batch_sampler) - self._collate_fn = loader.collate_fn or _default_collate_fn + self._collate_fn = loader.collate_fn or default_collate_fn self._num_workers = loader.num_workers self._use_buffer_reader = loader.use_buffer_reader self._use_shared_memory = loader.use_shared_memory diff --git a/python/paddle/fluid/reader.py b/python/paddle/fluid/reader.py index 6b716ff82f10a4d7a47489a15568b6dfbf0f87fc..4c1f0f1b38223b6ccf084d8b68f737a22371b12a 100644 --- a/python/paddle/fluid/reader.py +++ b/python/paddle/fluid/reader.py @@ -23,7 +23,7 @@ from .executor import global_scope from .data_feeder import DataFeeder, BatchedTensorProvider from .multiprocess_utils import multiprocess_queue_set, CleanupFuncRegistrar, _cleanup_mmap, _cleanup, _set_SIGCHLD_handler from .dataloader import BatchSampler, Dataset -from .dataloader.dataloader_iter import _DataLoaderIterSingleProcess, _DataLoaderIterMultiProcess +from .dataloader.dataloader_iter import _DataLoaderIterSingleProcess, _DataLoaderIterMultiProcess, default_collate_fn from .layers.io import monkey_patch_reader_methods, _copy_reader_var_, double_buffer from .unique_name import UniqueNameGenerator import logging @@ -43,7 +43,7 @@ else: # NOTE: [ avoid hanging & failed quickly ] These value is used in getting data from another process QUEUE_GET_TIMEOUT = 60 -__all__ = ['PyReader', 'DataLoader'] +__all__ = ['PyReader', 'DataLoader', 'default_collate_fn'] data_loader_unique_name_generator = UniqueNameGenerator() diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index b06f5a25f08d5fdcfdf3fb4ba5e69fa8b5b982ee..22190c05b325ba6226c5e90f56a58e69ac0f7d17 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -210,6 +210,7 @@ if (APPLE OR WIN32) list(REMOVE_ITEM TEST_OPS test_imperative_data_loader_fds_clear) list(REMOVE_ITEM TEST_OPS test_imperative_data_loader_exit_func) list(REMOVE_ITEM TEST_OPS test_imperative_signal_handler) + list(REMOVE_ITEM TEST_OPS test_multiprocess_dataloader_exception) endif() if(NOT WITH_GPU OR WIN32 OR APPLE) @@ -378,7 +379,8 @@ set_tests_properties(test_parallel_executor_crf test_sync_batch_norm_op test_inp PROPERTIES LABELS "RUN_TYPE=DIST" RUN_SERIAL TRUE) if(NOT WIN32 AND NOT APPLE) - set_tests_properties(test_imperative_data_loader_base PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" RUN_SERIAL TRUE) - set_tests_properties(test_imperative_data_loader_exception PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" RUN_SERIAL TRUE) - set_tests_properties(test_imperative_data_loader_fds_clear PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE" RUN_SERIAL TRUE) + set_tests_properties(test_imperative_data_loader_base PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE") + set_tests_properties(test_imperative_data_loader_fds_clear PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE") + set_tests_properties(test_imperative_data_loader_exception PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE") + set_tests_properties(test_multiprocess_dataloader_exception PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE") endif() diff --git a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_exception.py b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_exception.py new file mode 100644 index 0000000000000000000000000000000000000000..e19d17f0a0e7b07044d8b5583f7d0d1033db7a09 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_exception.py @@ -0,0 +1,199 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import division + +import os +import sys +import six +import time +import unittest +import multiprocessing +import numpy as np + +import paddle.fluid as fluid +from paddle.fluid.io import Dataset, BatchSampler, DataLoader +from paddle.fluid.dygraph.nn import Linear +from paddle.fluid.dygraph.base import to_variable + + +class RandomDataset(Dataset): + def __init__(self, sample_num): + self.sample_num = sample_num + + def __getitem__(self, idx): + np.random.seed(idx) + image = np.random.random([784]).astype('float32') + label = np.random.randint(0, 9, (1, )).astype('int64') + return image, label + + def __len__(self): + return self.sample_num + + +class TestDataLoaderAssert(unittest.TestCase): + def test_main(self): + place = fluid.cpu_places()[0] + with fluid.dygraph.guard(place): + dataset = RandomDataset(100) + batch_sampler = BatchSampler(dataset=dataset, batch_size=4) + + # dataset is not instance of Dataset + try: + loader = DataLoader(dataset=batch_sampler, places=place) + self.assertTrue(False) + except AssertionError: + pass + + # places is None + try: + loader = DataLoader(dataset=dataset, places=None) + self.assertTrue(False) + except AssertionError: + pass + + # num_workers < 0 + try: + loader = DataLoader( + dataset=dataset, places=place, num_workers=-1) + self.assertTrue(False) + except AssertionError: + pass + + # timeout < 0 + try: + loader = DataLoader(dataset=dataset, places=place, timeout=-1) + self.assertTrue(False) + except AssertionError: + pass + + # batch_sampler is not instance of BatchSampler + try: + loader = DataLoader( + dataset=dataset, places=place, batch_sampler=dataset) + self.assertTrue(False) + except AssertionError: + pass + + # set batch_sampler and shuffle/batch_size/drop_last + try: + loader = DataLoader( + dataset=dataset, + places=place, + batch_sampler=batch_sampler, + shuffle=True, + drop_last=True) + self.assertTrue(False) + except AssertionError: + pass + + # set batch_sampler correctly + try: + loader = DataLoader( + dataset=dataset, places=place, batch_sampler=batch_sampler) + self.assertTrue(True) + except AssertionError: + self.assertTrue(False) + + +# CI Converage cannot record stub in subprocess, +# HACK a _worker_loop in main process call here +class TestDataLoaderWorkerLoop(unittest.TestCase): + def run_without_worker_done(self, use_shared_memory=True): + try: + place = fluid.cpu_places()[0] + with fluid.dygraph.guard(place): + dataset = RandomDataset(800) + + # test init_fn + def _init_fn(worker_id): + pass + + # test collate_fn + def _collate_fn(sample_list): + return [ + np.stack( + s, axis=0) for s in list(zip(*sample_list)) + ] + + loader = DataLoader( + dataset, + num_workers=1, + places=place, + use_shared_memory=use_shared_memory) + assert loader.num_workers > 0, \ + "go to AssertionError and pass in Mac and Windows" + loader = iter(loader) + print("loader length", len(loader)) + indices_queue = multiprocessing.Queue() + for i in range(10): + indices_queue.put([i, i + 10]) + indices_queue.put(None) + loader._worker_loop( + loader._dataset, indices_queue, loader._data_queue, + loader._workers_done_event, _collate_fn, _init_fn, 0) + self.assertTrue(False) + except AssertionError: + pass + except Exception: + self.assertTrue(False) + + def run_with_worker_done(self, use_shared_memory=True): + try: + place = fluid.cpu_places()[0] + with fluid.dygraph.guard(place): + dataset = RandomDataset(800) + + # test init_fn + def _init_fn(worker_id): + pass + + # test collate_fn + def _collate_fn(sample_list): + return [ + np.stack( + s, axis=0) for s in list(zip(*sample_list)) + ] + + loader = DataLoader( + dataset, + num_workers=1, + places=place, + use_shared_memory=use_shared_memory) + assert loader.num_workers > 0, \ + "go to AssertionError and pass in Mac and Windows" + loader = iter(loader) + print("loader length", len(loader)) + indices_queue = multiprocessing.Queue() + for i in range(10): + indices_queue.put([i, i + 10]) + indices_queue.put(None) + loader._workers_done_event.set() + loader._worker_loop( + loader._dataset, indices_queue, loader._data_queue, + loader._workers_done_event, _collate_fn, _init_fn, 0) + self.assertTrue(True) + except AssertionError: + pass + except Exception: + self.assertTrue(False) + + def test_main(self): + for use_shared_memory in [True, False]: + self.run_without_worker_done(use_shared_memory) + self.run_with_worker_done(use_shared_memory) + + +if __name__ == '__main__': + unittest.main()