# Copyright (c) 2020 PaddlePaddle Authors. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import numpy as np import math from .sampler import Sampler, SequenceSampler, RandomSampler from .dataset import Dataset, IterableDataset __all__ = ["BatchSampler", "DistributedBatchSampler"] class BatchSampler(Sampler): """ A base implement of batch sampler used by `paddle.io.DataLoader` which yield mini-batch indices(a list/tuple with length as mini-batch size and holds sample indices) iterably. Batch sampler used by :code:`paddle.io.DataLoader` should be a subclass of :code:`paddle.io.BatchSampler`, BatchSampler subclasses should implement following methods: :code:`__iter__`: return mini-batch indices iterably. :code:`__len__`: get mini-batch number in an epoch. Args: dataset(Dataset, optional): this should be an instance of a subclass of :ref:`api_paddle_io_Dataset` or :ref:`api_paddle_io_IterableDataset` or other python object which implemented :code:`__len__` for BatchSampler to get indices as the range of :attr:`dataset` length. Default None, disabled. sampler (Sampler, optional): this should be a :ref:`api_paddle_io_Sample` instance which implemented :code:`__iter__` to generate sample indices. :attr:`sampler` and :attr:`dataset` can not be set in the same time. If :attr:`sampler` is set, :attr:`dataset` should not be set. Default None, disabled. shuffle(bool, optional): whether to shuffle indices order before generating batch indices. Default False, don't shuffle indices before generating batch indices. batch_size(int, optional): sample indice number in a mini-batch indices. default 1, each mini-batch includes 1 sample. drop_last(bool, optional): whether drop the last incomplete (less than 1 mini-batch) batch dataset. Default False, keep it. see :ref:`api_paddle_io_DataLoader` Returns: BatchSampler: an iterable object for indices iterating Examples: .. code-block:: python from paddle.io import RandomSampler, BatchSampler, Dataset # init with dataset class RandomDataset(Dataset): def __init__(self, num_samples): self.num_samples = num_samples def __getitem__(self, idx): image = np.random.random([784]).astype('float32') label = np.random.randint(0, 9, (1, )).astype('int64') return image, label def __len__(self): return self.num_samples bs = BatchSampler(dataset=RandomDataset(100), shuffle=False, batch_size=16, drop_last=False) for batch_indices in bs: print(batch_indices) # init with sampler sampler = RandomSampler(RandomDataset(100)) bs = BatchSampler(sampler=sampler, batch_size=8, drop_last=True) for batch_indices in bs: print(batch_indices) """ def __init__( self, dataset=None, sampler=None, shuffle=False, batch_size=1, drop_last=False, ): if dataset is None: assert ( sampler is not None ), "either dataset or sampler should be set" assert isinstance( sampler, Sampler ), "sampler should be a paddle.io.Sampler, but got {}".format( type(sampler) ) assert not shuffle, "shuffle should be False when sampler is set" self.sampler = sampler else: assert not isinstance( dataset, IterableDataset ), "dataset should not be a paddle.io.IterableDataset" assert sampler is None, "should not set both dataset and sampler" assert isinstance( shuffle, bool ), "shuffle should be a boolean value, but got {}".format( type(shuffle) ) if shuffle: self.sampler = RandomSampler(dataset) else: self.sampler = SequenceSampler(dataset) assert ( isinstance(batch_size, int) and batch_size > 0 ), "batch_size should be a positive integer, but got {}".format( batch_size ) self.batch_size = batch_size assert isinstance( drop_last, bool ), "drop_last should be a boolean value, but got {}".format( type(drop_last) ) self.drop_last = drop_last def __iter__(self): batch_indices = [] for idx in self.sampler: batch_indices.append(idx) if len(batch_indices) == self.batch_size: yield batch_indices batch_indices = [] if not self.drop_last and len(batch_indices) > 0: yield batch_indices def __len__(self): num_samples = len(self.sampler) num_samples += int(not self.drop_last) * (self.batch_size - 1) return num_samples // self.batch_size class _InfiniteIterableSampler: def __init__(self, dataset, batch_size=1): assert isinstance( dataset, IterableDataset ), "dataset should be an instance of paddle.io.IterableDataset" self.dataset = dataset self.batch_size = batch_size def __iter__(self): while True: yield [None] * self.batch_size class DistributedBatchSampler(BatchSampler): """Sampler that restricts data loading to a subset of the dataset. In such case, each process can pass a DistributedBatchSampler instance as a DataLoader sampler, and load a subset of the original dataset that is exclusive to it. .. note:: Dataset is assumed to be of constant size. Args: dataset(Dataset): this could be an instance of subclass of :ref:`api_paddle_io_Dataset` or other python object which implemented `__len__` for BatchSampler to get indices of samples. batch_size(int): sample size of each mini-batch. num_replicas(int, optional): porcess number in distributed training. If :attr:`num_replicas` is None, :attr:`num_replicas` will be retrieved from :ref:`api_paddle_distributed_ParallelEnv` . Default None. rank(int, optional): the rank of the current process among :attr:`num_replicas` processes. If :attr:`rank` is None, :attr:`rank` is retrieved from :ref:`api_paddle_distributed_ParallelEnv`. Default None. shuffle(bool, optional): whther to shuffle indices order before genrating batch indices. Default False. drop_last(bool, optional): whether drop the last incomplete(less than a mini-batch) batch dataset size. Default False. Returns: DistributedBatchSampler, return an iterable object for indices iterating. Examples: .. code-block:: python import numpy as np from paddle.io import Dataset, DistributedBatchSampler # init with dataset class RandomDataset(Dataset): def __init__(self, num_samples): self.num_samples = num_samples def __getitem__(self, idx): image = np.random.random([784]).astype('float32') label = np.random.randint(0, 9, (1, )).astype('int64') return image, label def __len__(self): return self.num_samples dataset = RandomDataset(100) sampler = DistributedBatchSampler(dataset, batch_size=64) for data in sampler: # do something break """ def __init__( self, dataset, batch_size, num_replicas=None, rank=None, shuffle=False, drop_last=False, ): self.dataset = dataset assert ( isinstance(batch_size, int) and batch_size > 0 ), "batch_size should be a positive integer" self.batch_size = batch_size assert isinstance(shuffle, bool), "shuffle should be a boolean value" self.shuffle = shuffle assert isinstance( drop_last, bool ), "drop_last should be a boolean number" from paddle.distributed import ParallelEnv if num_replicas is not None: assert ( isinstance(num_replicas, int) and num_replicas > 0 ), "num_replicas should be a positive integer" self.nranks = num_replicas else: self.nranks = ParallelEnv().nranks if rank is not None: assert ( isinstance(rank, int) and rank >= 0 ), "rank should be a non-negative integer" self.local_rank = rank else: self.local_rank = ParallelEnv().local_rank self.drop_last = drop_last self.epoch = 0 self.num_samples = int(math.ceil(len(self.dataset) * 1.0 / self.nranks)) self.total_size = self.num_samples * self.nranks def __iter__(self): num_samples = len(self.dataset) indices = np.arange(num_samples).tolist() indices += indices[: (self.total_size - len(indices))] assert len(indices) == self.total_size if self.shuffle: np.random.RandomState(self.epoch).shuffle(indices) self.epoch += 1 # subsample def _get_indices_by_batch_size(indices): subsampled_indices = [] last_batch_size = self.total_size % (self.batch_size * self.nranks) assert last_batch_size % self.nranks == 0 last_local_batch_size = last_batch_size // self.nranks for i in range( self.local_rank * self.batch_size, len(indices) - last_batch_size, self.batch_size * self.nranks, ): subsampled_indices.extend(indices[i : i + self.batch_size]) indices = indices[len(indices) - last_batch_size :] subsampled_indices.extend( indices[ self.local_rank * last_local_batch_size : (self.local_rank + 1) * last_local_batch_size ] ) return subsampled_indices if self.nranks > 1: indices = _get_indices_by_batch_size(indices) assert len(indices) == self.num_samples _sample_iter = iter(indices) batch_indices = [] for idx in _sample_iter: batch_indices.append(idx) if len(batch_indices) == self.batch_size: yield batch_indices batch_indices = [] if not self.drop_last and len(batch_indices) > 0: yield batch_indices def __len__(self): num_samples = self.num_samples num_samples += int(not self.drop_last) * (self.batch_size - 1) return num_samples // self.batch_size def set_epoch(self, epoch): """ Sets the epoch number. When :attr:`shuffle=True`, this number is used as seeds of random numbers. By default, users may not set this, all replicas (workers) use a different random ordering for each epoch. If set same number at each epoch, this sampler will yield the same ordering at all epoches. Arguments: epoch (int): Epoch number. Examples: .. code-block:: python import numpy as np from paddle.io import Dataset, DistributedBatchSampler # init with dataset class RandomDataset(Dataset): def __init__(self, num_samples): self.num_samples = num_samples def __getitem__(self, idx): image = np.random.random([784]).astype('float32') label = np.random.randint(0, 9, (1, )).astype('int64') return image, label def __len__(self): return self.num_samples dataset = RandomDataset(100) sampler = DistributedBatchSampler(dataset, batch_size=64) for epoch in range(10): sampler.set_epoch(epoch) """ self.epoch = epoch