From 5588b923f3010cfe82e22fc1db151f1ab62cd2dd Mon Sep 17 00:00:00 2001 From: chengduo Date: Wed, 19 Jun 2019 20:58:56 +0800 Subject: [PATCH] Add multi process reader (#18115) * add multi process reader test=develop --- paddle/fluid/API.spec | 2 +- python/paddle/fluid/contrib/reader/README.md | 4 +- .../contrib/reader/distributed_reader.py | 75 +++++++++---------- .../contrib/tests/test_distributed_reader.py | 19 ++--- python/paddle/reader/tests/decorator_test.py | 2 +- 5 files changed, 48 insertions(+), 54 deletions(-) diff --git a/paddle/fluid/API.spec b/paddle/fluid/API.spec index e7c7d2d07b..1bde2b37a6 100644 --- a/paddle/fluid/API.spec +++ b/paddle/fluid/API.spec @@ -406,7 +406,7 @@ paddle.fluid.contrib.QuantizeTranspiler.training_transpile (ArgSpec(args=['self' paddle.fluid.contrib.Calibrator.__init__ (ArgSpec(args=['self'], varargs='args', keywords='kwargs', defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) paddle.fluid.contrib.Calibrator.sample_data (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', '3b8c85ca1e2cf753cc8c90a6c6992958')) paddle.fluid.contrib.Calibrator.save_int8_model (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) -paddle.fluid.contrib.distributed_sampler (ArgSpec(args=['reader', 'batch_size'], varargs=None, keywords=None, defaults=None), ('document', '9a271cd9700deb6d837ed724ba094315')) +paddle.fluid.contrib.distributed_batch_reader (ArgSpec(args=['batch_reader'], varargs=None, keywords=None, defaults=None), ('document', 'b60796eb0a481484dd34e345f0eaa4d5')) paddle.fluid.contrib.reader.ctr_reader.ctr_reader (ArgSpec(args=['feed_dict', 'file_type', 'file_format', 'dense_slot_index', 'sparse_slot_index', 'capacity', 'thread_num', 'batch_size', 'file_list', 'slots', 'name'], varargs=None, keywords=None, defaults=(None,)), ('document', 'b2ebf3de2a6ef1af2c3b88d2db7591ab')) paddle.fluid.contrib.Compressor.__init__ (ArgSpec(args=['self', 'place', 'scope', 'train_program', 'train_reader', 'train_feed_list', 'train_fetch_list', 'eval_program', 'eval_reader', 'eval_feed_list', 'eval_fetch_list', 'teacher_programs', 'checkpoint_path', 'train_optimizer', 'distiller_optimizer', 'search_space'], varargs=None, keywords=None, defaults=(None, None, None, None, None, None, None, [], None, None, None, None)), ('document', 'c195b3bba26169cff9439e8c467557c0')) paddle.fluid.contrib.Compressor.config (ArgSpec(args=['self', 'config_file'], varargs=None, keywords=None, defaults=None), ('document', '780d9c007276ccbb95b292400d7807b0')) diff --git a/python/paddle/fluid/contrib/reader/README.md b/python/paddle/fluid/contrib/reader/README.md index 07c5430916..f043a17493 100644 --- a/python/paddle/fluid/contrib/reader/README.md +++ b/python/paddle/fluid/contrib/reader/README.md @@ -16,10 +16,10 @@ and two types of data format: ## Distributed reader -The distributed reader is mainly used by multi-process tasks, it splits the origin batch samples to N sub-batch samples, and the N is equal to the number of processes. The usage is similar to `paddle.batch`. +The distributed reader is mainly used by multi-process tasks, and the input must be a batch reader. Cons: - It can be operated conveniently so that different processes can read different data. Pros: - - Because each process reads the original batch data and then divides the data, the performance may be poor. + - If batch_reader produces training data, and batch_reader loads or preprocesses data for a long time, this data reading method may be slower. diff --git a/python/paddle/fluid/contrib/reader/distributed_reader.py b/python/paddle/fluid/contrib/reader/distributed_reader.py index 5361f701e7..ecee769218 100644 --- a/python/paddle/fluid/contrib/reader/distributed_reader.py +++ b/python/paddle/fluid/contrib/reader/distributed_reader.py @@ -15,52 +15,51 @@ from __future__ import print_function import os -__all__ = ["distributed_sampler"] +__all__ = ["distributed_batch_reader"] -def distributed_sampler(reader, batch_size): +def distributed_batch_reader(batch_reader): """ - Create a distributed reader. + Create a reader for multi-process training. The input must be a batch reader. - :param reader: the data reader to read from. - :type reader: callable - :param batch_size: the size of the batch - :type batch_size: int - """ - - def batch_reader(): - if not os.getenv('PADDLE_TRAINER_ID'): - raise RuntimeError( - "The current program is not in distributed mode.") + Args: + batch_reader (callable): The input reader should be a batch reader. - trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) - trainer_count = int(os.getenv("PADDLE_TRAINERS_NUM", "1")) + Examples: - def _slice_data(size): - per_node_lines = size // trainer_count - return [ - trainer_id * per_node_lines, (trainer_id + 1) * per_node_lines - ] + .. code-block:: python + import paddle + import paddle.fluid as fluid - r = reader() - b = [] + train_reader = paddle.batch(paddle.dataset.mnist.train(), + batch_size=32,drop_last=True) + train_reader = fluid.contrib.reader.distributed_batch_reader( + train_reader) - for instance in r: - b.append(instance) - if len(b) == batch_size: - if len(b) >= trainer_count: - begin, end = _slice_data(len(b)) - yield b[begin:end] - b = [] + """ + trainers_num = int(os.environ.get('PADDLE_TRAINERS_NUM', 1)) + trainer_id = int(os.getenv("PADDLE_TRAINER_ID", 0)) + assert trainer_id < trainers_num - if len(b) >= trainer_count: - begin, end = _slice_data(len(b)) - yield b[begin:end] + def decorate_for_multi_process(): + if trainers_num > 1: + print("start data reader (trainers_num: {}, trainer_id: {})".format( + trainers_num, trainer_id)) - # Batch size check - batch_size = int(batch_size) - if batch_size <= 0: - raise ValueError("batch_size should be a positive integeral value, " - "but got batch_size={}".format(batch_size)) + train_data, idx = None, 1 + for batch_id, data in enumerate(batch_reader()): + if trainers_num > 1: + if idx < trainers_num: + if idx == trainer_id + 1: + train_data = data + idx += 1 + else: + if idx == trainer_id + 1: + train_data = data + assert train_data is not None, "train data should not be None." + yield train_data + train_data, idx = None, 1 + else: + yield data - return batch_reader + return decorate_for_multi_process diff --git a/python/paddle/fluid/contrib/tests/test_distributed_reader.py b/python/paddle/fluid/contrib/tests/test_distributed_reader.py index 080012f9aa..51e1455e71 100644 --- a/python/paddle/fluid/contrib/tests/test_distributed_reader.py +++ b/python/paddle/fluid/contrib/tests/test_distributed_reader.py @@ -20,26 +20,21 @@ import paddle.fluid as fluid import os -def data_generator(input_shape=(1, 32, 32), label_range=9): - while True: - img = np.random.random(size=input_shape).astype(np.float32) - label = np.array(np.random.randint(0, label_range)).astype("int64") - yield img, label +def data_generator(): + data = [0, 1, 2, 3] + for val in data: + yield val class TestDistributedReader(unittest.TestCase): def test_distributed_reader(self): - batch_size = 32 trainer_num = 4 - os.environ['PADDLE_TRAINER_ID'] = str(0) + os.environ['PADDLE_TRAINER_ID'] = str(1) os.environ['PADDLE_TRAINERS_NUM'] = str(trainer_num) - reader = fluid.contrib.reader.distributed_sampler( - data_generator, batch_size=batch_size) + reader = fluid.contrib.reader.distributed_batch_reader(data_generator) data = next(reader()) - assert len(data) == batch_size // trainer_num,\ - "sub batch size should be {}, but the returned size is {}".format( - batch_size // trainer_num, len(data)) + assert data == 1 os.unsetenv('PADDLE_TRAINER_ID') os.unsetenv('PADDLE_TRAINERS_NUM') diff --git a/python/paddle/reader/tests/decorator_test.py b/python/paddle/reader/tests/decorator_test.py index a9dddbbcc8..ef07640ed8 100644 --- a/python/paddle/reader/tests/decorator_test.py +++ b/python/paddle/reader/tests/decorator_test.py @@ -198,7 +198,7 @@ class TestMultiProcessReader(unittest.TestCase): results.append(data) self.assertEqual(sorted(self.samples), sorted(results)) - def test_multi_process_reader(self): + def test_distributed_batch_reader(self): self.reader_test(use_pipe=False) self.reader_test(use_pipe=True) -- GitLab