diff --git a/paddle/fluid/API.spec b/paddle/fluid/API.spec index 7152a0d20720ee92f8aa50873a9ba22246003efe..ba1454fd6be0e50118a9604d234aabd64210c313 100644 --- a/paddle/fluid/API.spec +++ b/paddle/fluid/API.spec @@ -400,6 +400,7 @@ paddle.fluid.contrib.QuantizeTranspiler.training_transpile (ArgSpec(args=['self' paddle.fluid.contrib.Calibrator.__init__ (ArgSpec(args=['self'], varargs='args', keywords='kwargs', defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) paddle.fluid.contrib.Calibrator.sample_data (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', '3b8c85ca1e2cf753cc8c90a6c6992958')) paddle.fluid.contrib.Calibrator.save_int8_model (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) +paddle.fluid.contrib.distributed_sampler (ArgSpec(args=['reader', 'batch_size'], varargs=None, keywords=None, defaults=None), ('document', '9a271cd9700deb6d837ed724ba094315')) paddle.fluid.contrib.reader.ctr_reader.ctr_reader (ArgSpec(args=['feed_dict', 'file_type', 'file_format', 'dense_slot_index', 'sparse_slot_index', 'capacity', 'thread_num', 'batch_size', 'file_list', 'slots', 'name'], varargs=None, keywords=None, defaults=(None,)), ('document', 'b2ebf3de2a6ef1af2c3b88d2db7591ab')) paddle.fluid.contrib.Compressor.__init__ (ArgSpec(args=['self', 'place', 'scope', 'train_program', 'train_reader', 'train_feed_list', 'train_fetch_list', 'eval_program', 'eval_reader', 'eval_feed_list', 'eval_fetch_list', 'teacher_programs', 'checkpoint_path', 'train_optimizer', 'distiller_optimizer'], varargs=None, keywords=None, defaults=(None, None, None, None, None, None, None, [], './checkpoints', None, None)), ('document', '31ae143830c9bf6b43547dd546c5ba80')) paddle.fluid.contrib.Compressor.config (ArgSpec(args=['self', 'config_file'], varargs=None, keywords=None, defaults=None), ('document', '780d9c007276ccbb95b292400d7807b0')) diff --git a/python/paddle/fluid/contrib/reader/README.md b/python/paddle/fluid/contrib/reader/README.md index 9e4b7d1ce3d9664495220d7ccfc6ef6eac0b81c2..07c5430916a92c548f413b0069bdbf4d9896bb49 100644 --- a/python/paddle/fluid/contrib/reader/README.md +++ b/python/paddle/fluid/contrib/reader/README.md @@ -13,3 +13,13 @@ and two types of data format: * label dense_fea,dense_fea sparse_fea,sparse_fea - the svm data format is : * label slot1:fea_sign slot2:fea_sign slot1:fea_sign + +## Distributed reader + +The distributed reader is mainly used by multi-process tasks, it splits the origin batch samples to N sub-batch samples, and the N is equal to the number of processes. The usage is similar to `paddle.batch`. + +Cons: + - It can be operated conveniently so that different processes can read different data. + +Pros: + - Because each process reads the original batch data and then divides the data, the performance may be poor. diff --git a/python/paddle/fluid/contrib/reader/__init__.py b/python/paddle/fluid/contrib/reader/__init__.py index 4cf85ffc166420f117db9576b4d687c96d429e3c..e96acc5682a05606e0fbac689c9fbf87c28ad668 100644 --- a/python/paddle/fluid/contrib/reader/__init__.py +++ b/python/paddle/fluid/contrib/reader/__init__.py @@ -15,5 +15,8 @@ from __future__ import print_function from . import ctr_reader +from .distributed_reader import * -__all__ = ctr_reader.__all__ +__all__ = [] +__all__ += distributed_reader.__all__ +__all__ += ctr_reader.__all__ diff --git a/python/paddle/fluid/contrib/reader/distributed_reader.py b/python/paddle/fluid/contrib/reader/distributed_reader.py new file mode 100644 index 0000000000000000000000000000000000000000..5361f701e718b90f5925878e620ba1ce1e9894e9 --- /dev/null +++ b/python/paddle/fluid/contrib/reader/distributed_reader.py @@ -0,0 +1,66 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function +import os + +__all__ = ["distributed_sampler"] + + +def distributed_sampler(reader, batch_size): + """ + Create a distributed reader. + + :param reader: the data reader to read from. + :type reader: callable + :param batch_size: the size of the batch + :type batch_size: int + """ + + def batch_reader(): + if not os.getenv('PADDLE_TRAINER_ID'): + raise RuntimeError( + "The current program is not in distributed mode.") + + trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) + trainer_count = int(os.getenv("PADDLE_TRAINERS_NUM", "1")) + + def _slice_data(size): + per_node_lines = size // trainer_count + return [ + trainer_id * per_node_lines, (trainer_id + 1) * per_node_lines + ] + + r = reader() + b = [] + + for instance in r: + b.append(instance) + if len(b) == batch_size: + if len(b) >= trainer_count: + begin, end = _slice_data(len(b)) + yield b[begin:end] + b = [] + + if len(b) >= trainer_count: + begin, end = _slice_data(len(b)) + yield b[begin:end] + + # Batch size check + batch_size = int(batch_size) + if batch_size <= 0: + raise ValueError("batch_size should be a positive integeral value, " + "but got batch_size={}".format(batch_size)) + + return batch_reader diff --git a/python/paddle/fluid/contrib/tests/test_distributed_reader.py b/python/paddle/fluid/contrib/tests/test_distributed_reader.py new file mode 100644 index 0000000000000000000000000000000000000000..080012f9aacf09ae6fb01d16a4dc1ee88ec5265e --- /dev/null +++ b/python/paddle/fluid/contrib/tests/test_distributed_reader.py @@ -0,0 +1,49 @@ +# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import unittest +import numpy as np +import paddle.fluid as fluid +import os + + +def data_generator(input_shape=(1, 32, 32), label_range=9): + while True: + img = np.random.random(size=input_shape).astype(np.float32) + label = np.array(np.random.randint(0, label_range)).astype("int64") + yield img, label + + +class TestDistributedReader(unittest.TestCase): + def test_distributed_reader(self): + batch_size = 32 + trainer_num = 4 + os.environ['PADDLE_TRAINER_ID'] = str(0) + os.environ['PADDLE_TRAINERS_NUM'] = str(trainer_num) + + reader = fluid.contrib.reader.distributed_sampler( + data_generator, batch_size=batch_size) + data = next(reader()) + assert len(data) == batch_size // trainer_num,\ + "sub batch size should be {}, but the returned size is {}".format( + batch_size // trainer_num, len(data)) + + os.unsetenv('PADDLE_TRAINER_ID') + os.unsetenv('PADDLE_TRAINERS_NUM') + + +if __name__ == '__main__': + unittest.main()