未验证 提交 793f3b93 编写于 作者: K Kaipeng Deng 提交者: GitHub

move DataLoader code to paddle.io (#48699)

* move DataLoader to paddle.io. test=develop
上级 6f28eb70
......@@ -17,16 +17,16 @@ import abc
import numpy as np
import paddle
from paddle.fluid.dataloader.batch_sampler import (
from paddle.io import BatchSampler, IterableDataset
from paddle.io.dataloader.batch_sampler import (
DistributedBatchSampler,
_InfiniteIterableSampler,
)
from paddle.fluid.dataloader.dataloader_iter import (
from paddle.io.dataloader.dataloader_iter import (
_DatasetKind,
default_collate_fn,
default_convert_fn,
)
from paddle.io import BatchSampler, IterableDataset
class DistributedDataLoaderBase(metaclass=abc.ABCMeta):
......@@ -272,7 +272,7 @@ class DistributedDataLoader(DistributedDataLoaderBase):
return next(self.data)
def _create_inner_dataloader(self):
dataloader = paddle.fluid.io.DataLoader(
dataloader = paddle.io.DataLoader(
self.dataset,
feed_list=self.feed_list,
places=self.places,
......
......@@ -55,8 +55,6 @@ from paddle.fluid.log_helper import get_logger
from . import reader
from . import unique_name
from .reader import *
from . import dataloader
from .dataloader import *
from . import core
from paddle.utils import deprecated
from paddle.fluid.framework import static_only
......
此差异已折叠。
......@@ -37,8 +37,8 @@ from paddle.distributed.fleet.meta_parallel.parallel_layers.pp_layers import (
)
from paddle.distributed.sharding.group_sharded import group_sharded_parallel
from paddle.distributed.utils.log_utils import get_logger
from paddle.fluid.dataloader.dataset import IterableDataset
from paddle.incubate.distributed.utils.io import save_for_auto_inference
from paddle.io import IterableDataset
from paddle.nn import Linear
logger = get_logger("INFO", __file__)
......
......@@ -406,7 +406,7 @@ class TestDataLoaderGenerateStates(unittest.TestCase):
]
def test_main(self):
from paddle.fluid.dataloader.worker import _generate_states
from paddle.io.dataloader.worker import _generate_states
for inp, outp in zip(self.inputs, self.outputs):
out = _generate_states(*inp)
......
......@@ -19,8 +19,8 @@ import numpy as np
from paddle import fluid
from paddle.fluid import core
from paddle.fluid.dataloader.dataloader_iter import _worker_loop
from paddle.io import BatchSampler, DataLoader, Dataset, IterableDataset
from paddle.io.dataloader.worker import _worker_loop
class RandomDataset(Dataset):
......
......@@ -84,7 +84,7 @@ def set_config(config=None):
if config is None:
core.enable_autotune()
core.enable_layout_autotune()
paddle.fluid.reader.set_autotune_config(use_autotune=True)
paddle.io.reader.set_autotune_config(use_autotune=True)
return
config_dict = {}
......@@ -147,7 +147,7 @@ def set_config(config=None):
)
if "tuning_steps" in dataloader_config:
if isinstance(dataloader_config['tuning_steps'], int):
paddle.fluid.reader.set_autotune_config(
paddle.io.reader.set_autotune_config(
use_autoune, dataloader_config['tuning_steps']
)
else:
......@@ -155,4 +155,4 @@ def set_config(config=None):
"The auto-tuning configuration of the dataloader is incorrect."
"The `tuning_steps` should be int. Use default parameter instead."
)
paddle.fluid.reader.set_autotune_config(use_autoune)
paddle.io.reader.set_autotune_config(use_autoune)
......@@ -14,21 +14,21 @@
# TODO: define all functions about input & output in this directory
from ..fluid.io import DataLoader # noqa: F401
from ..fluid.dataloader import Dataset # noqa: F401
from ..fluid.dataloader import IterableDataset # noqa: F401
from ..fluid.dataloader import BatchSampler # noqa: F401
from ..fluid.dataloader import get_worker_info # noqa: F401
from ..fluid.dataloader import TensorDataset # noqa: F401
from ..fluid.dataloader import Sampler # noqa: F401
from ..fluid.dataloader import SequenceSampler # noqa: F401
from ..fluid.dataloader import RandomSampler # noqa: F401
from ..fluid.dataloader import DistributedBatchSampler # noqa: F401
from ..fluid.dataloader import ComposeDataset # noqa: F401
from ..fluid.dataloader import ChainDataset # noqa: F401
from ..fluid.dataloader import WeightedRandomSampler # noqa: F401
from ..fluid.dataloader import Subset # noqa: F401
from ..fluid.dataloader import random_split # noqa: F401
from .reader import DataLoader # noqa: F401
from .dataloader import Dataset # noqa: F401
from .dataloader import IterableDataset # noqa: F401
from .dataloader import BatchSampler # noqa: F401
from .dataloader import get_worker_info # noqa: F401
from .dataloader import TensorDataset # noqa: F401
from .dataloader import Sampler # noqa: F401
from .dataloader import SequenceSampler # noqa: F401
from .dataloader import RandomSampler # noqa: F401
from .dataloader import DistributedBatchSampler # noqa: F401
from .dataloader import ComposeDataset # noqa: F401
from .dataloader import ChainDataset # noqa: F401
from .dataloader import WeightedRandomSampler # noqa: F401
from .dataloader import Subset # noqa: F401
from .dataloader import random_split # noqa: F401
__all__ = [ # noqa
'Dataset',
......
......@@ -12,21 +12,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from . import dataset
from .dataset import *
from .dataset import Dataset
from .dataset import IterableDataset
from .dataset import TensorDataset
from .dataset import ComposeDataset
from .dataset import ChainDataset
from .dataset import random_split
from .dataset import Subset
from . import batch_sampler
from .batch_sampler import *
from .batch_sampler import BatchSampler
from .batch_sampler import DistributedBatchSampler
from . import dataloader_iter
from .dataloader_iter import *
from .worker import get_worker_info
from . import sampler
from .sampler import *
__all__ = (
dataset.__all__
+ batch_sampler.__all__
+ dataloader_iter.__all__
+ sampler.__all__
)
from .sampler import Sampler
from .sampler import SequenceSampler
from .sampler import RandomSampler
from .sampler import WeightedRandomSampler
......@@ -12,13 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import numpy as np
import math
from .sampler import Sampler, SequenceSampler, RandomSampler
from .dataset import Dataset, IterableDataset
import numpy as np
__all__ = ["BatchSampler", "DistributedBatchSampler"]
from .dataset import IterableDataset
from .sampler import RandomSampler, Sampler, SequenceSampler
class BatchSampler(Sampler):
......
......@@ -12,13 +12,14 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
import numbers
from collections.abc import Mapping, Sequence
import numpy as np
from ..framework import _non_static_mode
from .. import core, layers
from collections.abc import Sequence, Mapping
import paddle
from ...framework import core
def default_collate_fn(batch):
......
......@@ -12,51 +12,39 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import logging
import os
import queue
import sys
import time
import signal
import numbers
import logging
import itertools
import threading
import time
import warnings
import numpy as np
from collections import namedtuple
from paddle.fluid.framework import (
_set_expected_place,
_current_expected_place,
set_flags,
)
import queue
import numpy as np
import paddle
import paddle.profiler as profiler
from paddle import profiler
from paddle.fluid.framework import _current_expected_place, _set_expected_place
from paddle.profiler.timer import benchmark
from paddle.profiler.utils import in_profiler_mode
from .. import core, layers
from ..framework import in_dygraph_mode
from ...framework import core, in_dygraph_mode
from ..multiprocess_utils import (
_set_SIGCHLD_handler,
MP_STATUS_CHECK_INTERVAL,
CleanupFuncRegistrar,
_set_SIGCHLD_handler,
)
from .fetcher import _IterableDatasetFetcher, _MapDatasetFetcher
from .batch_sampler import _InfiniteIterableSampler
from .collate import default_collate_fn, default_convert_fn
from .flat import _flatten_batch, _restore_batch
from .worker import (
ParentWatchDog,
get_worker_info,
_worker_loop,
_DatasetKind,
_IterableDatasetStopIteration,
_WorkerException,
_ResumeIteration,
_worker_loop,
_WorkerException,
)
from .flat import _flatten_batch, _restore_batch
from paddle.profiler.timer import benchmark
__all__ = ['get_worker_info']
# NOTE: fix `terminate called without an active exception`
# if for loop break and program exit immediately(with no model
......@@ -95,7 +83,7 @@ class _DataLoaderIterBase:
data by setting in given dataloader.
Args:
loader(instance of DataLoader): instance of `fluid.io.DataLoader`
loader(instance of DataLoader): instance of `paddle.io.DataLoader`
"""
def __init__(self, loader):
......@@ -439,7 +427,7 @@ class _DataLoaderIterMultiProcess(_DataLoaderIterBase):
self._shutdown = False
def _init_workers(self):
import paddle.incubate.multiprocessing as multiprocessing
from paddle.incubate import multiprocessing
# multiprocess worker and indice queue list initial as empty
self._workers = []
......
......@@ -13,17 +13,8 @@
# limitations under the License.
import paddle
from .. import framework
__all__ = [
"Dataset",
"IterableDataset",
"TensorDataset",
"ComposeDataset",
"ChainDataset",
"random_split",
"Subset",
]
from ... import framework
class Dataset:
......
......@@ -12,12 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from ..log_helper import get_logger
from collections.abc import Sequence, Mapping
_WARNING_TO_LOG = True
class _DatasetFetcher:
def __init__(self, dataset, auto_collate_batch, collate_fn, drop_last):
......@@ -37,47 +31,8 @@ class _DatasetFetcher:
# ecah sample processing in the batch
def fetch(self, batch_indices, done_event=None):
raise NotImplementedError(
"'fetch' not implement for class {}".format(self.__class__.__name__)
)
def _log_warning(self):
# only log warning on GPU 0 when distributed launch
from ...distributed import get_world_size, get_rank
if get_world_size() >= 2 and get_rank() != 0:
return
warn_str = (
"Detect dataset only contains single fileds, return format "
"changed since Paddle 2.1. In Paddle <= 2.0, DataLoader add "
"a list surround output data(e.g. return [data]), and in "
"Paddle >= 2.1, DataLoader return the single filed directly "
"(e.g. return data). For example, in following code: \n\n"
)
warn_str += (
"import numpy as np\n"
"from paddle.io import DataLoader, Dataset\n\n"
"class RandomDataset(Dataset):\n"
" def __getitem__(self, idx):\n"
" data = np.random.random((2, 3)).astype('float32')\n\n"
" return data\n\n"
" def __len__(self):\n"
" return 10\n\n"
"dataset = RandomDataset()\n"
"loader = DataLoader(dataset, batch_size=1)\n"
"data = next(loader())\n\n"
)
warn_str += (
"In Paddle <= 2.0, data is in format '[Tensor(shape=(1, 2, 3), "
"dtype=float32)]', and in Paddle >= 2.1, data is in format"
" 'Tensor(shape=(1, 2, 3), dtype=float32)'\n"
)
logger = get_logger(
"DataLoader", logging.INFO, fmt='%(levelname)s: %(message)s'
f"'fetch' not implement for class {self.__class__.__name__}"
)
logger.warning(warn_str)
class _IterableDatasetFetcher(_DatasetFetcher):
......@@ -103,10 +58,6 @@ class _IterableDatasetFetcher(_DatasetFetcher):
):
raise StopIteration
global _WARNING_TO_LOG
if not isinstance(data[0], (Sequence, Mapping)) and _WARNING_TO_LOG:
self._log_warning()
_WARNING_TO_LOG = False
else:
data = next(self.dataset_iter)
......@@ -128,10 +79,6 @@ class _MapDatasetFetcher(_DatasetFetcher):
else:
return None
global _WARNING_TO_LOG
if not isinstance(data[0], (Sequence, Mapping)) and _WARNING_TO_LOG:
self._log_warning()
_WARNING_TO_LOG = False
else:
data = self.dataset[batch_indices]
......
......@@ -12,12 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import paddle
import numbers
import numpy as np
from collections.abc import Mapping, Sequence
from collections.abc import Sequence, Mapping
import numpy as np
import paddle
FIELD_PREFIX = "_paddle_field_"
......@@ -38,7 +38,7 @@ def _flatten_batch(batch):
field,
(np.ndarray, paddle.Tensor, paddle.fluid.core.eager.Tensor),
):
structure.append('{}{}'.format(FIELD_PREFIX, field_idx))
structure.append(f'{FIELD_PREFIX}{field_idx}')
flat_batch.append(field)
field_idx += 1
elif isinstance(field, (str, bytes, numbers.Number)):
......@@ -61,7 +61,7 @@ def _flatten_batch(batch):
field,
(np.ndarray, paddle.Tensor, paddle.fluid.core.eager.Tensor),
):
structure[k] = '{}{}'.format(FIELD_PREFIX, field_idx)
structure[k] = f'{FIELD_PREFIX}{field_idx}'
flat_batch.append(field)
field_idx += 1
elif isinstance(field, (str, bytes, numbers.Number)):
......@@ -79,7 +79,7 @@ def _flatten_batch(batch):
else:
structure[k] = field
else:
raise TypeError("wrong flat data type: {}".format(type(batch)))
raise TypeError(f"wrong flat data type: {type(batch)}")
return structure, field_idx
......@@ -130,7 +130,7 @@ def _restore_batch(flat_batch, structure):
elif isinstance(field, (Sequence, Mapping)):
field_idx = _restore(structure[k], field_idx)
else:
raise TypeError("wrong flat data type: {}".format(type(structure)))
raise TypeError(f"wrong flat data type: {type(structure)}")
return field_idx
......@@ -145,7 +145,7 @@ def _restore_batch(flat_batch, structure):
if isinstance(structure, (str, bytes)):
assert structure == '{}{}'.format(
FIELD_PREFIX, 0
), "invalid structure: {}".format(structure)
), f"invalid structure: {structure}"
return flat_batch[0]
field_idx = _restore(structure, 0)
assert field_idx + 1 == len(flat_batch), "Tensor parse incomplete"
......
......@@ -13,14 +13,8 @@
# limitations under the License.
import numpy as np
from .. import core
__all__ = [
"Sampler",
"SequenceSampler",
"RandomSampler",
"WeightedRandomSampler",
]
from ...framework import core
class Sampler:
......@@ -317,7 +311,7 @@ class WeightedRandomSampler(Sampler):
idxs = _weighted_sample(
self.weights, self.num_samples, self.replacement
)
return iter(idxs.reshape((-1)).tolist())
return iter(idxs.reshape(-1).tolist())
def __len__(self):
mul = np.prod(self.weights.shape) // self.weights.shape[-1]
......
......@@ -13,25 +13,25 @@
# limitations under the License.
import os
# NOTE: queue has a different name in python2 and python3
import queue
import sys
import paddle
import numpy as np
import traceback
from collections import namedtuple
from .. import core
from .fetcher import _IterableDatasetFetcher, _MapDatasetFetcher
import numpy as np
import paddle
from ...framework import core
from ..multiprocess_utils import (
_cleanup_mmap,
CleanupFuncRegistrar,
MP_STATUS_CHECK_INTERVAL,
CleanupFuncRegistrar,
_cleanup_mmap,
)
from ..framework import _non_static_mode, _in_eager_without_dygraph_check
from .fetcher import _IterableDatasetFetcher, _MapDatasetFetcher
from .flat import _flatten_batch
import queue
__all__ = ['get_worker_info']
class _IterableDatasetStopIteration:
def __init__(self, worker_id):
......@@ -59,7 +59,7 @@ class _DatasetKind:
dataset, auto_collate_batch, collate_fn, drop_last
)
else:
raise NotImplementedError("unknown Dataset kind {}".format(kind))
raise NotImplementedError(f"unknown Dataset kind {kind}")
class ParentWatchDog:
......@@ -291,9 +291,9 @@ def _worker_loop(
# set different numpy seed for each worker
try:
import numpy as np
import time
import random
import numpy as np
except ImportError:
pass
else:
......
# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import atexit
# NOTE: queue has a different name in python2 and python3
import queue
import signal
import sys
from ..framework import core
# multi-process worker check indices queue interval, avoid
# hanging in subprocess data loading
MP_STATUS_CHECK_INTERVAL = 5.0
# NOTE: [ mmap files clear ] If there is still data in the multiprocess queue when the main process finishes reading,
# the data in the queue needs to be popped. Then the LoDTensor read by the main process
# from the child process will automatically clear the memory-mapped file.
multiprocess_queue_set = set()
def _clear_multiprocess_queue_set():
global multiprocess_queue_set
for data_queue in multiprocess_queue_set:
while True:
try:
data_queue.get_nowait()
except queue.Empty:
break
# NOTE: main process clear function at exit
def _cleanup():
# NOTE: inter-process Queue shared memory objects clear function
_clear_multiprocess_queue_set()
# NOTE: main process memory map files clear funciton
core._cleanup_mmap_fds()
# NOTE: for child process clear function at exit
def _cleanup_mmap():
# clear memory map files in child process
core._cleanup_mmap_fds()
# NOTE used for register a function to be executed at interpreter exit.
class CleanupFuncRegistrar:
# Record the cleanup functions that have been executed
_executed_func_set = set()
# Record the cleanup functions that have been registered
_registered_func_set = set()
@classmethod
def register(cls, function, signals=[]):
def _func_exectuor():
if function not in cls._executed_func_set:
try:
function()
finally:
cls._executed_func_set.add(function)
def _func_register(function):
if not callable(function):
raise TypeError("%s is not callable object." % (function))
# check function object whether hash-able {function}
if function not in cls._registered_func_set:
atexit.register(_func_exectuor)
cls._registered_func_set.add(function)
def _signal_handler(signum=None, frame=None):
_func_exectuor()
if signum is not None:
if signum == signal.SIGINT:
raise KeyboardInterrupt
sys.exit(signum)
def _signal_register(signals):
signals = set(signals)
for sig in signals:
orig_handler = signal.signal(sig, _signal_handler)
if orig_handler not in (signal.SIG_DFL, signal.SIG_IGN):
if (
sig == signal.SIGINT
and orig_handler is signal.default_int_handler
):
continue
if orig_handler not in cls._registered_func_set:
atexit.register(orig_handler)
cls._registered_func_set.add(orig_handler)
# deal with signals
_signal_register(signals)
# deal with function
_func_register(function)
# NOTE: [ mmap files clear ] When the main process exits unexpectedly, the remaining
# shared memory objects in the inter-process Queue and the main process (mostly in the
# BlockingQueue) may not be completely released, resulting in the corresponding
# memory-mapped file remaining on the disk (/dev/shm), so register this function
# to clean up shared memory objects in these two queues before the python interpreter exits.
# NOTE: Currently multi-process DataLoader only supports Linux platform
if not (sys.platform == 'darwin' or sys.platform == 'win32'):
CleanupFuncRegistrar.register(_cleanup)
# ------------ SIGCHLD handler setting --------------
_SIGCHLD_handler_set = False
def _set_SIGCHLD_handler():
global _SIGCHLD_handler_set
if _SIGCHLD_handler_set:
return
current_handler = signal.getsignal(signal.SIGCHLD)
if not callable(current_handler):
current_handler = None
def __handler__(signum, frame):
# NOTE: Here the signum is SIGCHLD, when the child process exits,
# this handler will be called whenever the child process exits
# normally or abnormally.
core._throw_error_if_process_failed()
if current_handler is not None:
current_handler(signum, frame)
signal.signal(signal.SIGCHLD, __handler__)
_SIGCHLD_handler_set = True
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
import multiprocessing
# NOTE: queue has a different name in python2 and python3
import sys
import time
import warnings
import paddle
from paddle.fluid.framework import logging
from ..fluid.framework import (
_current_expected_place,
_get_paddle_place,
_get_paddle_place_list,
_non_static_mode,
)
from ..framework import core
from .dataloader import BatchSampler, IterableDataset, Subset
from .dataloader.batch_sampler import _InfiniteIterableSampler
from .dataloader.dataloader_iter import (
_DataLoaderIterMultiProcess,
_DataLoaderIterSingleProcess,
_DatasetKind,
)
# NOTE: [ avoid hanging & failed quickly ]
# These value is used in getting data from another process
QUEUE_GET_TIMEOUT = 60
USE_PINNED_MEMORY = None
# AutoTune Flags
USE_AUTOTUNE = False
TUNING_STEPS = 500
def set_autotune_config(use_autotune, tuning_steps=500):
global USE_AUTOTUNE
USE_AUTOTUNE = use_autotune
global TUNING_STEPS
TUNING_STEPS = tuning_steps
def use_pinned_memory(*args):
global USE_PINNED_MEMORY
if len(args) == 0:
return USE_PINNED_MEMORY
else:
assert len(args) == 1 and isinstance(args[0], bool)
USE_PINNED_MEMORY = args[0]
def _convert_places(places):
if not isinstance(places, (list, tuple)):
places = [places]
ret = []
for p in places:
if not isinstance(p, core.Place):
tmp = core.Place()
tmp.set_place(p)
p = tmp
ret.append(p)
return ret
class AuToTune:
def __init__(self, loader):
self.loader = loader
self.max_num_worker = multiprocessing.cpu_count() / 2
def __call__(self):
# use default loader
if (not USE_AUTOTUNE) or (not self.need_autotune()):
return self.loader.num_workers
# get autotune loader
auto_tune_loader = self.get_autotune_loader()
if auto_tune_loader is None:
return self.loader.num_workers
# pick the best num_workers
auto_tune_start = time.time()
logging.debug("========= DataLoader Auto Tune =========")
logging.debug(
"User config for DataLoader: " + str(self.loader.num_workers)
)
best_num_workers = 0
min_cost = float("inf")
logging.debug(
"Tuning Range for num_workers: 0 ~ " + str(self.max_num_worker)
)
num_workers = 0
while num_workers < self.max_num_worker:
auto_tune_loader.num_workers = num_workers
avg_cost = self.evaluate_reader_cost(auto_tune_loader)
if min_cost * 0.75 > avg_cost:
min_cost = avg_cost
best_num_workers = num_workers
else:
update_num = self.is_best(
auto_tune_loader,
best_num_workers,
min_cost,
self.max_num_worker,
)
if update_num == best_num_workers:
break
else:
best_num_workers = update_num
logging.debug(
"num_workers: "
+ str(num_workers)
+ " avg_cost: "
+ str(avg_cost)
)
num_workers += 2
logging.info(
"auto_tune dataLoader best_num_workers: " + str(best_num_workers)
)
logging.debug(
"AutoTuning Cost for DataLoader: "
+ str(time.time() - auto_tune_start)
+ ' seconds'
)
# tune the default loader's num_workers
return best_num_workers
def need_autotune(self):
if sys.platform == 'darwin' or sys.platform == 'win32':
return False
else:
return True
def get_sub_dataset(self, dataset, batch_size):
num_samples = min(batch_size * TUNING_STEPS, len(dataset))
sub_dataset = Subset(dataset, indices=list(range(num_samples)))
return sub_dataset
def get_autotune_loader(self):
loader = copy.copy(self.loader)
batch_size = self.loader.batch_sampler.batch_size
if isinstance(
self.loader.batch_sampler, paddle.io.DistributedBatchSampler
):
dataset = self.loader.batch_sampler.dataset
sub_dataset = self.get_sub_dataset(dataset, batch_size)
loader.batch_sampler = paddle.io.DistributedBatchSampler(
dataset=sub_dataset,
batch_size=batch_size,
num_replicas=self.loader.batch_sampler.nranks,
rank=self.loader.batch_sampler.local_rank,
shuffle=self.loader.batch_sampler.shuffle,
drop_last=self.loader.batch_sampler.drop_last,
)
elif isinstance(self.loader.batch_sampler, paddle.io.BatchSampler):
dataset = self.loader.batch_sampler.sampler.data_source
sub_dataset = self.get_sub_dataset(dataset, batch_size)
loader.batch_sampler = paddle.io.BatchSampler(
dataset=sub_dataset,
batch_size=batch_size,
drop_last=self.loader.batch_sampler.drop_last,
)
else:
loader = None
return loader
def evaluate_reader_cost(self, reader):
costs = []
avg_cost = 0
start = time.time()
for i, data in enumerate(reader):
costs.append(time.time() - start)
start = time.time()
if len(costs) > 2:
avg_cost = sum(costs[2:]) / len(costs[2:])
else:
avg_cost = sum(costs[0:]) / len(costs[0:])
return avg_cost
def is_best(self, reader, best_workers, best_time, num_work_boundary):
step = 0
num_workers = best_workers + 1
boundary = 1
while num_workers < num_work_boundary and step < 5:
self.loader.num_workers = num_workers
time = self.evaluate_reader_cost(reader)
logging.debug(
"for back num_workers: "
+ str(num_workers)
+ " avg_cost: "
+ str(time)
)
step += 1
if time < best_time * 0.70 * boundary:
return num_workers
else:
num_workers += 1
boundary *= 0.80
return best_workers
class DataLoader:
"""
DataLoader prodives an iterator which iterates given dataset
once by the batch_sampler.
DataLoader supports single-process and multi-prcess data loading,
multi-process workers will be used to load data asynchronously if
:attr:`num_workers` is set as a positive number.
DataLoader supports map-style dataset and iterable-style dataset.
For map-style datast(can get a sample from dataset with a given
index), please see :code:`paddle.io.Dataset`.
For iterable-style datast(get samples from dataset iteratively,
like a Python iterator), please see :code:`paddle.io.IterableDataset`.
For :code:`batch_sampler` please see :code:`paddle.io.BatchSampler`
.. note::
GPU tensor operation is not supported in subprocess currently,
please don't use GPU tensor operations in pipeline which will
be performed in subprocess, such as dataset transforms, collte_fn,
etc. Numpy array and CPU tensor operation is supported.
**Disable automatic batching**
In certain cases such as some NLP tasks, instead of automatic batching,
handling batching manually in dataset is needed by users. For these
cases, automatic batching is disabled if both :attr:`batch_size` and
:attr:`batch_sampler` is set as None, each data got from :attr:`dataset`
should be batched data and will be processed with function define by
:attr:`collate_fn` or :attr:`default_collate_fn`.
.. note::
When automatic batching is disabled, :attr:`default_collate_fn` will
do nothing to data from dataset.
Args:
dataset(Dataset): the dataset to load data from, should be an
instance of subclass of :code:`paddle.io.Dataset` or
:code:`paddle.io.IterableDataset`.
feed_list (list(Tensor)|tuple(Tensor), optional): feed Tensor list.
The Tensors should be created by :code:`paddle.static.data()`.
:attr:`feed_list` must be set if :attr:`return_list` is
False. Default None.
places(list(Place)|tuple(Place)|list(str), optional): a list of Place,
to put data onto, :attr:`places` can be None, if
:attr:`places` is None, default place(CPUPlace or CUDAPlace(0))
will be used. Default None. If ``places`` is list of string,
the string in the list can be ``cpu``, ``gpu:x`` and ``gpu_pinned``,
where ``x`` is the index of the GPUs.
return_list (bool, optional): whether the return value on each device is
presented as a list. If :attr:`return_list=False`, the return
value on each device would be a dict of str -> Tensor, where
the key of the dict is the name of each fed Tensors. If
:attr:`return_list=True`, the return value on each device would
be a list(Tensor). :attr:`return_list` can only be True
in dynamic graph mode. Default True.
batch_sampler(BatchSampler, optional): an instance of `paddle.io.BatchSampler`
to generate batch indices to draw samples from :attr:`dataset`
and combine a batch. Default None.
batch_size(int|None, optional): sample number in a mini-batch, a substitution
parameter for :attr:`batch_sampler`, if :attr:`batch_sampler`
is not set, a default `paddle.io.BatchSampler` will be used
and initialize by :attr:`batch_size`, :attr:`shuffle` and
:attr:`drop_last`. Default 1.
shuffle(bool, optional): whther to shuffle indices order before genrate
batch indices, a substitution parameter for :attr:`batch_sampler`
see :attr:`batch_size`. Default False.
drop_last(bool, optional): whether drop the last incomplete batch dataset size
is not divisible by the batch size, a substitution parameter
for :attr:`batch_sampler`, see :attr:`batch_size`. Default False
collate_fn(callable, optional): function to generate mini-batch data by merging
the sample list, None for only stack each fields of sample in axis
0(same as :attr::`np.stack(..., axis=0)`). Default None
num_workers(int, optional): the number of subprocess to load data, 0 for no
subprocess used and loading data in main process. Default 0
use_buffer_reader (bool, optional): whether to use bufferred reader.
If use_buffer_reader=True, the DataLoader would prefetch
batch data asynchronously, so it would speed up data feeding
and occupies a little more CPU or GPU memory, i.e., the memory
of one batch input data. Default True.
prefetch_factor (int, optional): Number of batch data the DataLoader would prefetch
if use_buffer_reader=True. Default 2.
use_shared_memory (bool, optional): whether to use shared memory to speed up
putting data into inter-process queue, set :attr:`use_shared_memory`
as True only when the shared memory space on your machine(e.g.
space of '/dev/shm' on Linux operating sysytem) is large enough.
Shared memory will only be enabled in multi-process mode(num_workers
> 0). Default True.
timeout(int, optional): the timeout value for getting data form output queue
of subprocesses. Default 0.
worker_init_fn(callable, optional): init function which will be called with
worker id on each subproces starting if not set as None. Default
None.
Returns:
DataLoader: an iterable object for data iterating, each elemnet of the generated data is a Tensor.
Examples:
.. code-block:: python
import numpy as np
import paddle
import paddle.nn as nn
import paddle.nn.functional as F
from paddle.io import Dataset, BatchSampler, DataLoader
BATCH_NUM = 20
BATCH_SIZE = 16
EPOCH_NUM = 4
IMAGE_SIZE = 784
CLASS_NUM = 10
# define a random dataset
class RandomDataset(Dataset):
def __init__(self, num_samples):
self.num_samples = num_samples
def __getitem__(self, idx):
image = np.random.random([IMAGE_SIZE]).astype('float32')
label = np.random.randint(0, CLASS_NUM - 1, (1, )).astype('int64')
return image, label
def __len__(self):
return self.num_samples
dataset = RandomDataset(BATCH_NUM * BATCH_SIZE)
class SimpleNet(nn.Layer):
def __init__(self):
super().__init__()
self.fc = nn.Linear(IMAGE_SIZE, CLASS_NUM)
def forward(self, image, label=None):
return self.fc(image)
simple_net = SimpleNet()
opt = paddle.optimizer.SGD(learning_rate=1e-3,
parameters=simple_net.parameters())
loader = DataLoader(dataset,
batch_size=BATCH_SIZE,
shuffle=True,
drop_last=True,
num_workers=2)
for e in range(EPOCH_NUM):
for i, (image, label) in enumerate(loader()):
out = simple_net(image)
loss = F.cross_entropy(out, label)
avg_loss = paddle.mean(loss)
avg_loss.backward()
opt.minimize(avg_loss)
simple_net.clear_gradients()
print("Epoch {} batch {}: loss = {}".format(e, i, np.mean(loss.numpy())))
.. note::
For reading iterable dataset with multiprocess Dataloader,
please see :code:`paddle.io.IterableDataset`
"""
def __init__(
self,
dataset,
feed_list=None,
places=None,
return_list=True,
batch_sampler=None,
batch_size=1,
shuffle=False,
drop_last=False,
collate_fn=None,
num_workers=0,
use_buffer_reader=True,
prefetch_factor=2,
use_shared_memory=True,
timeout=0,
worker_init_fn=None,
persistent_workers=False,
):
self.return_list = return_list
self.collate_fn = collate_fn
self.use_buffer_reader = use_buffer_reader
self.prefetch_factor = prefetch_factor
self.worker_init_fn = worker_init_fn
self.dataset = dataset
if not return_list and not _non_static_mode():
assert (
feed_list is not None
), "feed_list should be set when return_list=False"
self.feed_list = feed_list
if places is None:
places = _current_expected_place()
if isinstance(places, (list, tuple)):
places = _get_paddle_place_list(places)
else:
places = _get_paddle_place(places)
self.places = _convert_places(places)
assert num_workers >= 0, "num_workers should be a non-negative value"
if num_workers > 0 and (
sys.platform == 'darwin' or sys.platform == 'win32'
):
warnings.warn(
"DataLoader with multi-process mode is not supported on MacOs and Windows currently."
" Please use signle-process mode with num_workers = 0 instead"
)
num_workers = 0
self.num_workers = num_workers
assert prefetch_factor > 0, "prefetch_factor should be a positive value"
self.use_shared_memory = use_shared_memory
if use_shared_memory and num_workers == 0:
self.use_shared_memory = False
assert timeout >= 0, "timeout should be a non-negative value"
self.timeout = timeout
if isinstance(dataset, IterableDataset):
self.dataset_kind = _DatasetKind.ITER
if shuffle:
raise ValueError(
"IterableDataset not support shuffle, but got shuffle={}".format(
shuffle
)
)
if batch_sampler is not None:
raise ValueError(
"IterableDataset expect unspecified batch_sampler"
)
else:
self.dataset_kind = _DatasetKind.MAP
if batch_sampler is not None:
assert batch_size == 1 and not shuffle and not drop_last, (
"batch_size/shuffle/drop_last should not be set when "
"batch_sampler is given"
)
self.batch_sampler = batch_sampler
self.batch_size = None
elif batch_size is None:
self.batch_sampler = None
self.batch_size = None
else:
assert batch_size > 0, (
"batch_size should be None or a positive value when "
"batch_sampler is not given"
)
self.batch_size = batch_size
if isinstance(dataset, IterableDataset):
self.batch_sampler = _InfiniteIterableSampler(
dataset, batch_size
)
else:
self.batch_sampler = BatchSampler(
dataset=dataset,
batch_size=batch_size,
shuffle=shuffle,
drop_last=drop_last,
)
self.drop_last = drop_last
self.auto_collate_batch = self.batch_sampler is not None
self.pin_memory = False
if _non_static_mode():
self.pin_memory = (
True if use_pinned_memory() is None else use_pinned_memory()
)
self._persistent_workers = persistent_workers
self._iterator = None
self.num_workers = AuToTune(self).__call__()
def __len__(self):
if self.dataset_kind == _DatasetKind.ITER:
raise ValueError("length of IterableDataset not supported")
else:
if self.auto_collate_batch:
return len(self.batch_sampler)
else:
return len(self.dataset)
def __iter__(self):
if self.num_workers == 0:
return _DataLoaderIterSingleProcess(self)
elif self._persistent_workers:
if self._iterator is None:
self._iterator = _DataLoaderIterMultiProcess(self)
else:
self._iterator._reset()
return self._iterator
else:
return _DataLoaderIterMultiProcess(self)
def __call__(self):
return self.__iter__()
......@@ -620,7 +620,7 @@ class PostTrainingQuantization:
self._batch_nums if self._batch_nums else len(self._data_loader)
)
return
self._data_loader = io.DataLoader.from_generator(
self._data_loader = reader.DataLoader.from_generator(
feed_list=feed_vars, capacity=3 * self._batch_size, iterable=True
)
if self._sample_generator is not None:
......
......@@ -445,7 +445,6 @@ packages=['paddle',
'paddle.fluid.proto',
'paddle.fluid.proto.profiler',
'paddle.fluid.layers',
'paddle.fluid.dataloader',
'paddle.fluid.contrib',
'paddle.fluid.contrib.extend_optimizer',
'paddle.fluid.incubate',
......@@ -492,6 +491,7 @@ packages=['paddle',
'paddle.sparse.nn.functional',
'paddle.incubate.xpu',
'paddle.io',
'paddle.io.dataloader',
'paddle.optimizer',
'paddle.nn',
'paddle.nn.functional',
......
......@@ -1421,7 +1421,6 @@ def get_setup_parameters():
'paddle.fluid.proto',
'paddle.fluid.proto.profiler',
'paddle.fluid.layers',
'paddle.fluid.dataloader',
'paddle.fluid.contrib',
'paddle.fluid.contrib.extend_optimizer',
'paddle.fluid.incubate',
......@@ -1468,6 +1467,7 @@ def get_setup_parameters():
'paddle.sparse.nn.functional',
'paddle.incubate.xpu',
'paddle.io',
'paddle.io.dataloader',
'paddle.optimizer',
'paddle.nn',
'paddle.nn.functional',
......
......@@ -109,7 +109,7 @@ def mlp_pretrain_forward(train_program, start_program):
error_cost = paddle.nn.functional.square_error_cost(predict, label)
loss = paddle.mean(error_cost)
loader = paddle.io.DataLoader.from_generator(
loader = paddle.fluid.io.DataLoader.from_generator(
feed_list=[input, label], capacity=4 * batch_size, iterable=True
)
......
......@@ -297,7 +297,7 @@ def train_builtin_data_vars():
with static.program_guard(engine.main_program, engine.startup_program):
feed_list = engine.inputs + engine.labels
print(feed_list)
loader = paddle.io.DataLoader.from_generator(
loader = paddle.fluid.io.DataLoader.from_generator(
feed_list=feed_list, capacity=4 * batch_size, iterable=False
)
......@@ -324,7 +324,7 @@ def train_non_builtin_data_vars():
)
label = static.data(name="label", shape=[batch_size, 1], dtype='int64')
loader = paddle.io.DataLoader.from_generator(
loader = paddle.fluid.io.DataLoader.from_generator(
feed_list=[input, label], capacity=4 * batch_size, iterable=False
)
places = static.cuda_places()
......@@ -383,7 +383,7 @@ def get_cost():
)
label = static.data(name="label", shape=[batch_size, 1], dtype='int64')
loader = paddle.io.DataLoader.from_generator(
loader = paddle.fluid.io.DataLoader.from_generator(
feed_list=[input, label], capacity=4 * batch_size, iterable=False
)
places = static.cuda_places()
......@@ -434,7 +434,7 @@ def get_cost_by_default_program():
)
label = static.data(name="label", shape=[batch_size, 1], dtype='int64')
loader = paddle.io.DataLoader.from_generator(
loader = paddle.fluid.io.DataLoader.from_generator(
feed_list=[input, label], capacity=4 * batch_size, iterable=False
)
places = static.cuda_places()
......
......@@ -130,7 +130,7 @@ def get_program():
)
data_holder = [input, label]
# dataloader
dataloader = paddle.io.DataLoader.from_generator(
dataloader = paddle.fluid.io.DataLoader.from_generator(
feed_list=data_holder, capacity=4 * batch_size, iterable=False
)
dataloader.set_batch_generator(
......
......@@ -112,7 +112,7 @@ def get_program():
)
data_holder = [input, label]
# dataloader
dataloader = paddle.io.DataLoader.from_generator(
dataloader = paddle.fluid.io.DataLoader.from_generator(
feed_list=data_holder, capacity=4 * batch_size, iterable=False
)
dataloader.set_batch_generator(
......
......@@ -124,7 +124,7 @@ def get_program():
)
data_holder = [input, label]
# dataloader
dataloader = paddle.io.DataLoader.from_generator(
dataloader = paddle.fluid.io.DataLoader.from_generator(
feed_list=data_holder, capacity=4 * batch_size, iterable=False
)
dataloader.set_batch_generator(
......
......@@ -148,7 +148,7 @@ def get_program():
)
data_holder = [input, label]
# dataloader
dataloader = paddle.io.DataLoader.from_generator(
dataloader = paddle.fluid.io.DataLoader.from_generator(
feed_list=data_holder, capacity=4 * batch_size, iterable=False
)
dataloader.set_batch_generator(
......
......@@ -136,7 +136,7 @@ def get_program():
data_holder = [input, label]
# dataloader
dataloader = paddle.io.DataLoader.from_generator(
dataloader = fluid.io.DataLoader.from_generator(
feed_list=data_holder, capacity=4 * batch_size, iterable=False
)
dataloader.set_batch_generator(
......
......@@ -255,7 +255,7 @@ class TestResnet(unittest.TestCase):
batch_size=batch_size,
drop_last=True,
)
data_loader = paddle.io.DataLoader.from_generator(
data_loader = paddle.fluid.io.DataLoader.from_generator(
capacity=5, iterable=True
)
data_loader.set_sample_list_generator(train_reader)
......
......@@ -132,7 +132,7 @@ def train(conf_dict, to_static):
global_step = 0
losses = []
train_loader = paddle.io.DataLoader.from_generator(
train_loader = paddle.fluid.io.DataLoader.from_generator(
capacity=16, return_list=True, iterable=True, use_double_buffer=True
)
get_train_examples = simnet_process.get_reader("train", epoch=args.epoch)
......
......@@ -199,13 +199,13 @@ class TestModel(unittest.TestCase):
mode='test', return_label=False, sample_num=sp_num
)
cls.train_loader = fluid.io.DataLoader(
cls.train_loader = paddle.io.DataLoader(
cls.train_dataset, places=cls.device, batch_size=64
)
cls.val_loader = fluid.io.DataLoader(
cls.val_loader = paddle.io.DataLoader(
cls.val_dataset, places=cls.device, batch_size=64
)
cls.test_loader = fluid.io.DataLoader(
cls.test_loader = paddle.io.DataLoader(
cls.test_dataset, places=cls.device, batch_size=64
)
......@@ -322,14 +322,14 @@ class TestModel(unittest.TestCase):
rank=rank,
)
train_loader = fluid.io.DataLoader(
train_loader = paddle.io.DataLoader(
self.train_dataset,
batch_sampler=train_sampler,
places=self.device,
return_list=True,
)
val_loader = fluid.io.DataLoader(
val_loader = paddle.io.DataLoader(
self.val_dataset,
batch_sampler=val_sampler,
places=self.device,
......@@ -375,14 +375,14 @@ class TestModel(unittest.TestCase):
rank=rank,
)
train_loader = fluid.io.DataLoader(
train_loader = paddle.io.DataLoader(
self.train_dataset,
batch_sampler=train_sampler,
places=self.device,
return_list=True,
)
val_loader = fluid.io.DataLoader(
val_loader = paddle.io.DataLoader(
self.val_dataset,
batch_sampler=val_sampler,
places=self.device,
......@@ -404,7 +404,7 @@ class TestModel(unittest.TestCase):
self.val_dataset, batch_size=64, shuffle=False
)
val_loader = fluid.io.DataLoader(
val_loader = paddle.io.DataLoader(
self.val_dataset,
batch_sampler=sampler,
places=self.device,
......@@ -432,7 +432,7 @@ class TestModel(unittest.TestCase):
self.test_dataset, batch_size=64, shuffle=False
)
test_loader = fluid.io.DataLoader(
test_loader = paddle.io.DataLoader(
self.test_dataset,
batch_sampler=sampler,
places=self.device,
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册