未验证 提交 84e7be31 编写于 作者: K Kaipeng Deng 提交者: GitHub

add TensorDataset for multiprocess DataLoader (#26332)

* add TensorDataset. test=develop
上级 2024ef69
...@@ -30,7 +30,8 @@ if six.PY2: ...@@ -30,7 +30,8 @@ if six.PY2:
else: else:
import queue import queue
from .. import core import paddle
from .. import core, layers
from ..framework import in_dygraph_mode from ..framework import in_dygraph_mode
from ..multiprocess_utils import CleanupFuncRegistrar, _cleanup_mmap, _set_SIGCHLD_handler from ..multiprocess_utils import CleanupFuncRegistrar, _cleanup_mmap, _set_SIGCHLD_handler
from .fetcher import _IterableDatasetFetcher, _MapDatasetFetcher from .fetcher import _IterableDatasetFetcher, _MapDatasetFetcher
...@@ -79,7 +80,13 @@ def default_collate_fn(batch): ...@@ -79,7 +80,13 @@ def default_collate_fn(batch):
slots.append([item]) slots.append([item])
else: else:
slots[i].append(item) slots[i].append(item)
if isinstance(slots[0][0], np.ndarray):
return [np.stack(slot, axis=0) for slot in slots] return [np.stack(slot, axis=0) for slot in slots]
elif isinstance(slots[0][0], paddle.Tensor):
return [layers.stack(slot, axis=0) for slot in slots]
else:
raise RuntimeError("Unknown data type {}".format(type(slots[0][0])))
class _DatasetKind(object): class _DatasetKind(object):
...@@ -284,6 +291,12 @@ class _DataLoaderIterSingleProcess(_DataLoaderIterBase): ...@@ -284,6 +291,12 @@ class _DataLoaderIterSingleProcess(_DataLoaderIterBase):
for slot in batch: for slot in batch:
if not isinstance(slot, core.LoDTensor): if not isinstance(slot, core.LoDTensor):
self._check_input_array(slot) self._check_input_array(slot)
# FIXME(dkp): blocking_queue only support
# core.LoDTensorArray as input now, read
# numpy data into a LoDTensorArray here,
# should support paddle.Tensor list later
if isinstance(slot, paddle.Tensor):
slot = slot.numpy()
tmp = core.LoDTensor() tmp = core.LoDTensor()
tmp.set(slot, core.CPUPlace()) tmp.set(slot, core.CPUPlace())
slot = tmp slot = tmp
...@@ -305,6 +318,8 @@ class _DataLoaderIterSingleProcess(_DataLoaderIterBase): ...@@ -305,6 +318,8 @@ class _DataLoaderIterSingleProcess(_DataLoaderIterBase):
@classmethod @classmethod
def _check_input_array(cls, item): def _check_input_array(cls, item):
if isinstance(item, paddle.Tensor):
return
arr = np.array(item) arr = np.array(item)
if arr.dtype == np.object: if arr.dtype == np.object:
raise TypeError(( raise TypeError((
...@@ -530,6 +545,14 @@ class _DataLoaderIterMultiProcess(_DataLoaderIterBase): ...@@ -530,6 +545,14 @@ class _DataLoaderIterMultiProcess(_DataLoaderIterBase):
out_queue.put((idx, e)) out_queue.put((idx, e))
else: else:
if self._use_shared_memory: if self._use_shared_memory:
# FIXME(dkp): _convert_to_tensor_list only support np.array
# list now, should support paddle.Tensor list
if isinstance(batch[0][0], paddle.Tensor):
np_batch = []
for sample in batch:
np_batch.append([s.numpy() for s in sample])
batch = np_batch
tensor_list = core._convert_to_tensor_list(batch) tensor_list = core._convert_to_tensor_list(batch)
out_queue.put((idx, tensor_list)) out_queue.put((idx, tensor_list))
core._remove_tensor_list_mmap_fds(tensor_list) core._remove_tensor_list_mmap_fds(tensor_list)
......
...@@ -14,9 +14,10 @@ ...@@ -14,9 +14,10 @@
from __future__ import print_function from __future__ import print_function
from .. import framework
import paddle.dataset.common import paddle.dataset.common
__all__ = ["Dataset", "IterableDataset"] __all__ = ["Dataset", "IterableDataset", "TensorDataset"]
class Dataset(object): class Dataset(object):
...@@ -222,3 +223,55 @@ class IterableDataset(Dataset): ...@@ -222,3 +223,55 @@ class IterableDataset(Dataset):
def __len__(self): def __len__(self):
raise RuntimeError("'{}' should not be called for IterableDataset" \ raise RuntimeError("'{}' should not be called for IterableDataset" \
"{}".format('__len__', self.__class__.__name__)) "{}".format('__len__', self.__class__.__name__))
class TensorDataset(Dataset):
"""
Dataset defined by a list of tensors.
Each tensor should be in shape of [N, ...], while N is the sample number,
and ecah tensor contains a field of sample, :code:`TensorDataset` retrieve
each sample by indexing tensors in the 1st dimension.
Args:
tensors(list of Tensor): tensors with same shape in the 1st dimension.
Returns:
Dataset: a Dataset instance wrapping tensors.
Examples:
.. code-block:: python
import numpy as np
import paddle
from paddle.io import TensorDataset
paddle.disable_static()
input_np = np.random.random([2, 3, 4]).astype('float32')
input = paddle.to_tensor(input_np)
label_np = np.random.random([2, 1]).astype('int32')
label = paddle.to_tensor(label_np)
dataset = TensorDataset([input, label])
for i in range(len(dataset)):
input, label = dataset[i]
print(input, label)
"""
def __init__(self, tensors):
if not framework.in_dygraph_mode():
raise RuntimeError(
"TensorDataset con only be used in imperative mode")
assert all([tensor.shape[0] == tensors[0].shape[0] for tensor in tensors]), \
"tensors not have same shape of the 1st dimension"
self.tensors = tensors
def __getitem__(self, index):
return tuple(tensor[index] for tensor in self.tensors)
def __len__(self):
return self.tensors[0].shape[0]
...@@ -347,6 +347,7 @@ if (APPLE OR WIN32) ...@@ -347,6 +347,7 @@ if (APPLE OR WIN32)
list(REMOVE_ITEM TEST_OPS test_multiprocess_dataloader_dynamic) list(REMOVE_ITEM TEST_OPS test_multiprocess_dataloader_dynamic)
list(REMOVE_ITEM TEST_OPS test_multiprocess_dataloader_exception) list(REMOVE_ITEM TEST_OPS test_multiprocess_dataloader_exception)
list(REMOVE_ITEM TEST_OPS test_multiprocess_dataloader_iterable_dataset) list(REMOVE_ITEM TEST_OPS test_multiprocess_dataloader_iterable_dataset)
list(REMOVE_ITEM TEST_OPS test_multiprocess_dataloader_dataset)
endif() endif()
if(NOT WITH_GPU OR WIN32 OR APPLE) if(NOT WITH_GPU OR WIN32 OR APPLE)
...@@ -580,6 +581,7 @@ if(NOT WIN32 AND NOT APPLE) ...@@ -580,6 +581,7 @@ if(NOT WIN32 AND NOT APPLE)
set_tests_properties(test_multiprocess_dataloader_exception PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE") set_tests_properties(test_multiprocess_dataloader_exception PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE")
set_tests_properties(test_multiprocess_dataloader_iterable_dataset_static PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE") set_tests_properties(test_multiprocess_dataloader_iterable_dataset_static PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE")
set_tests_properties(test_multiprocess_dataloader_iterable_dataset_dynamic PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE") set_tests_properties(test_multiprocess_dataloader_iterable_dataset_dynamic PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE")
set_tests_properties(test_multiprocess_dataloader_dataset PROPERTIES LABELS "RUN_TYPE=EXCLUSIVE")
endif() endif()
# setting timeout value for old unittests # setting timeout value for old unittests
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
import unittest
import numpy as np
import paddle
import paddle.fluid as fluid
from paddle.io import TensorDataset, DataLoader
from paddle.fluid.dygraph.base import to_variable
class TestTensorDataset(unittest.TestCase):
def run_main(self, num_workers, places):
fluid.default_startup_program().random_seed = 1
fluid.default_main_program().random_seed = 1
place = fluid.CPUPlace()
with fluid.dygraph.guard(place):
input_np = np.random.random([16, 3, 4]).astype('float32')
input = to_variable(input_np)
label_np = np.random.random([16, 1]).astype('int32')
label = to_variable(label_np)
dataset = TensorDataset([input, label])
assert len(dataset) == 16
dataloader = DataLoader(
dataset,
places=place,
num_workers=num_workers,
batch_size=1,
drop_last=True)
for i, (input, label) in enumerate(dataloader()):
assert len(input) == 1
assert len(label) == 1
assert input.shape == [1, 3, 4]
assert label.shape == [1, 1]
assert isinstance(input, paddle.Tensor)
assert isinstance(label, paddle.Tensor)
assert np.allclose(input.numpy(), input_np[i])
assert np.allclose(label.numpy(), label_np[i])
def test_main(self):
for p in [fluid.CPUPlace(), fluid.CUDAPlace(0)]:
for num_workers in [0, 2]:
ret = self.run_main(num_workers=num_workers, places=p)
if __name__ == '__main__':
unittest.main()
...@@ -16,6 +16,7 @@ ...@@ -16,6 +16,7 @@
__all__ = [ __all__ = [
'Dataset', 'Dataset',
'IterableDataset', 'IterableDataset',
'TensorDataset',
'BatchSampler', 'BatchSampler',
# 'Transform', # 'Transform',
'DataLoader', 'DataLoader',
...@@ -42,7 +43,7 @@ __all__ = [ ...@@ -42,7 +43,7 @@ __all__ = [
from ..fluid.io import DataLoader from ..fluid.io import DataLoader
from ..fluid.dataloader import Dataset, IterableDataset, BatchSampler, get_worker_info, \ from ..fluid.dataloader import Dataset, IterableDataset, BatchSampler, get_worker_info, \
Sampler, SequenceSampler, RandomSampler TensorDataset, Sampler, SequenceSampler, RandomSampler
from ..fluid.io import load, save, load_program_state, set_program_state, \ from ..fluid.io import load, save, load_program_state, set_program_state, \
load_inference_model, save_inference_model, batch load_inference_model, save_inference_model, batch
from ..reader import shuffle, buffered, cache, chain, firstn, compose, map_readers, xmap_readers from ..reader import shuffle, buffered, cache, chain, firstn, compose, map_readers, xmap_readers
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册