From 84e7be3178a24ef5e8d826b3687aea79cd40da06 Mon Sep 17 00:00:00 2001 From: Kaipeng Deng Date: Thu, 27 Aug 2020 13:55:31 +0800 Subject: [PATCH] add TensorDataset for multiprocess DataLoader (#26332) * add TensorDataset. test=develop --- .../fluid/dataloader/dataloader_iter.py | 27 +++++++- python/paddle/fluid/dataloader/dataset.py | 55 +++++++++++++++- .../fluid/tests/unittests/CMakeLists.txt | 2 + .../test_multiprocess_dataloader_dataset.py | 63 +++++++++++++++++++ python/paddle/io/__init__.py | 3 +- 5 files changed, 146 insertions(+), 4 deletions(-) create mode 100644 python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_dataset.py diff --git a/python/paddle/fluid/dataloader/dataloader_iter.py b/python/paddle/fluid/dataloader/dataloader_iter.py index a81d73d7e9a..aeb4e46504b 100644 --- a/python/paddle/fluid/dataloader/dataloader_iter.py +++ b/python/paddle/fluid/dataloader/dataloader_iter.py @@ -30,7 +30,8 @@ if six.PY2: else: import queue -from .. import core +import paddle +from .. import core, layers from ..framework import in_dygraph_mode from ..multiprocess_utils import CleanupFuncRegistrar, _cleanup_mmap, _set_SIGCHLD_handler from .fetcher import _IterableDatasetFetcher, _MapDatasetFetcher @@ -79,7 +80,13 @@ def default_collate_fn(batch): slots.append([item]) else: slots[i].append(item) - return [np.stack(slot, axis=0) for slot in slots] + + if isinstance(slots[0][0], np.ndarray): + return [np.stack(slot, axis=0) for slot in slots] + elif isinstance(slots[0][0], paddle.Tensor): + return [layers.stack(slot, axis=0) for slot in slots] + else: + raise RuntimeError("Unknown data type {}".format(type(slots[0][0]))) class _DatasetKind(object): @@ -284,6 +291,12 @@ class _DataLoaderIterSingleProcess(_DataLoaderIterBase): for slot in batch: if not isinstance(slot, core.LoDTensor): self._check_input_array(slot) + # FIXME(dkp): blocking_queue only support + # core.LoDTensorArray as input now, read + # numpy data into a LoDTensorArray here, + # should support paddle.Tensor list later + if isinstance(slot, paddle.Tensor): + slot = slot.numpy() tmp = core.LoDTensor() tmp.set(slot, core.CPUPlace()) slot = tmp @@ -305,6 +318,8 @@ class _DataLoaderIterSingleProcess(_DataLoaderIterBase): @classmethod def _check_input_array(cls, item): + if isinstance(item, paddle.Tensor): + return arr = np.array(item) if arr.dtype == np.object: raise TypeError(( @@ -530,6 +545,14 @@ class _DataLoaderIterMultiProcess(_DataLoaderIterBase): out_queue.put((idx, e)) else: if self._use_shared_memory: + # FIXME(dkp): _convert_to_tensor_list only support np.array + # list now, should support paddle.Tensor list + if isinstance(batch[0][0], paddle.Tensor): + np_batch = [] + for sample in batch: + np_batch.append([s.numpy() for s in sample]) + batch = np_batch + tensor_list = core._convert_to_tensor_list(batch) out_queue.put((idx, tensor_list)) core._remove_tensor_list_mmap_fds(tensor_list) diff --git a/python/paddle/fluid/dataloader/dataset.py b/python/paddle/fluid/dataloader/dataset.py index e47f57381c0..13bb946a5eb 100644 --- a/python/paddle/fluid/dataloader/dataset.py +++ b/python/paddle/fluid/dataloader/dataset.py @@ -14,9 +14,10 @@ from __future__ import print_function +from .. import framework import paddle.dataset.common -__all__ = ["Dataset", "IterableDataset"] +__all__ = ["Dataset", "IterableDataset", "TensorDataset"] class Dataset(object): @@ -222,3 +223,55 @@ class IterableDataset(Dataset): def __len__(self): raise RuntimeError("'{}' should not be called for IterableDataset" \ "{}".format('__len__', self.__class__.__name__)) + + +class TensorDataset(Dataset): + """ + Dataset defined by a list of tensors. + + Each tensor should be in shape of [N, ...], while N is the sample number, + and ecah tensor contains a field of sample, :code:`TensorDataset` retrieve + each sample by indexing tensors in the 1st dimension. + + Args: + tensors(list of Tensor): tensors with same shape in the 1st dimension. + + Returns: + Dataset: a Dataset instance wrapping tensors. + + Examples: + + .. code-block:: python + + import numpy as np + import paddle + from paddle.io import TensorDataset + + paddle.disable_static() + + input_np = np.random.random([2, 3, 4]).astype('float32') + input = paddle.to_tensor(input_np) + label_np = np.random.random([2, 1]).astype('int32') + label = paddle.to_tensor(label_np) + + dataset = TensorDataset([input, label]) + + for i in range(len(dataset)): + input, label = dataset[i] + print(input, label) + + """ + + def __init__(self, tensors): + if not framework.in_dygraph_mode(): + raise RuntimeError( + "TensorDataset con only be used in imperative mode") + assert all([tensor.shape[0] == tensors[0].shape[0] for tensor in tensors]), \ + "tensors not have same shape of the 1st dimension" + self.tensors = tensors + + def __getitem__(self, index): + return tuple(tensor[index] for tensor in self.tensors) + + def __len__(self): + return self.tensors[0].shape[0] diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 5122f961f48..aa4fb445b4d 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -347,6 +347,7 @@ if (APPLE OR WIN32) list(REMOVE_ITEM TEST_OPS test_multiprocess_dataloader_dynamic) list(REMOVE_ITEM TEST_OPS test_multiprocess_dataloader_exception) list(REMOVE_ITEM TEST_OPS test_multiprocess_dataloader_iterable_dataset) + list(REMOVE_ITEM TEST_OPS test_multiprocess_dataloader_dataset) endif() if(NOT WITH_GPU OR WIN32 OR APPLE) @@ -580,6 +581,7 @@ if(NOT WIN32 AND NOT APPLE) set_tests_properties(test_multiprocess_dataloader_exception PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE") set_tests_properties(test_multiprocess_dataloader_iterable_dataset_static PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE") set_tests_properties(test_multiprocess_dataloader_iterable_dataset_dynamic PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE") + set_tests_properties(test_multiprocess_dataloader_dataset PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE") endif() # setting timeout value for old unittests diff --git a/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_dataset.py b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_dataset.py new file mode 100644 index 00000000000..6e2f9562b45 --- /dev/null +++ b/python/paddle/fluid/tests/unittests/test_multiprocess_dataloader_dataset.py @@ -0,0 +1,63 @@ +# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import division + +import unittest +import numpy as np + +import paddle +import paddle.fluid as fluid +from paddle.io import TensorDataset, DataLoader +from paddle.fluid.dygraph.base import to_variable + + +class TestTensorDataset(unittest.TestCase): + def run_main(self, num_workers, places): + fluid.default_startup_program().random_seed = 1 + fluid.default_main_program().random_seed = 1 + place = fluid.CPUPlace() + with fluid.dygraph.guard(place): + input_np = np.random.random([16, 3, 4]).astype('float32') + input = to_variable(input_np) + label_np = np.random.random([16, 1]).astype('int32') + label = to_variable(label_np) + + dataset = TensorDataset([input, label]) + assert len(dataset) == 16 + dataloader = DataLoader( + dataset, + places=place, + num_workers=num_workers, + batch_size=1, + drop_last=True) + + for i, (input, label) in enumerate(dataloader()): + assert len(input) == 1 + assert len(label) == 1 + assert input.shape == [1, 3, 4] + assert label.shape == [1, 1] + assert isinstance(input, paddle.Tensor) + assert isinstance(label, paddle.Tensor) + assert np.allclose(input.numpy(), input_np[i]) + assert np.allclose(label.numpy(), label_np[i]) + + def test_main(self): + for p in [fluid.CPUPlace(), fluid.CUDAPlace(0)]: + for num_workers in [0, 2]: + ret = self.run_main(num_workers=num_workers, places=p) + + +if __name__ == '__main__': + unittest.main() diff --git a/python/paddle/io/__init__.py b/python/paddle/io/__init__.py index 89bbd591657..78f792d6a5a 100644 --- a/python/paddle/io/__init__.py +++ b/python/paddle/io/__init__.py @@ -16,6 +16,7 @@ __all__ = [ 'Dataset', 'IterableDataset', + 'TensorDataset', 'BatchSampler', # 'Transform', 'DataLoader', @@ -42,7 +43,7 @@ __all__ = [ from ..fluid.io import DataLoader from ..fluid.dataloader import Dataset, IterableDataset, BatchSampler, get_worker_info, \ - Sampler, SequenceSampler, RandomSampler + TensorDataset, Sampler, SequenceSampler, RandomSampler from ..fluid.io import load, save, load_program_state, set_program_state, \ load_inference_model, save_inference_model, batch from ..reader import shuffle, buffered, cache, chain, firstn, compose, map_readers, xmap_readers -- GitLab