distributed_reader.py 2.0 KB
Newer Older
C
chengduo 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
#   Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import print_function
import os

__all__ = ["distributed_sampler"]


def distributed_sampler(reader, batch_size):
    """
    Create a distributed reader.

    :param reader: the data reader to read from.
    :type reader: callable
    :param batch_size: the size of the batch
    :type batch_size: int
    """

    def batch_reader():
        if not os.getenv('PADDLE_TRAINER_ID'):
            raise RuntimeError(
                "The current program is not in distributed mode.")

        trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
        trainer_count = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))

        def _slice_data(size):
            per_node_lines = size // trainer_count
            return [
                trainer_id * per_node_lines, (trainer_id + 1) * per_node_lines
            ]

        r = reader()
        b = []

        for instance in r:
            b.append(instance)
            if len(b) == batch_size:
                if len(b) >= trainer_count:
                    begin, end = _slice_data(len(b))
                    yield b[begin:end]
                b = []

        if len(b) >= trainer_count:
            begin, end = _slice_data(len(b))
            yield b[begin:end]

    # Batch size check
    batch_size = int(batch_size)
    if batch_size <= 0:
        raise ValueError("batch_size should be a positive integeral value, "
                         "but got batch_size={}".format(batch_size))

    return batch_reader