未验证 提交 d7efcf95 编写于 作者: C chengduo 提交者: GitHub

[Cherry-pick]Add multi process reader (#18116)

* add multi process reader
test=release/1.5
上级 598addf1
...@@ -405,7 +405,7 @@ paddle.fluid.contrib.QuantizeTranspiler.training_transpile (ArgSpec(args=['self' ...@@ -405,7 +405,7 @@ paddle.fluid.contrib.QuantizeTranspiler.training_transpile (ArgSpec(args=['self'
paddle.fluid.contrib.Calibrator.__init__ (ArgSpec(args=['self'], varargs='args', keywords='kwargs', defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) paddle.fluid.contrib.Calibrator.__init__ (ArgSpec(args=['self'], varargs='args', keywords='kwargs', defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
paddle.fluid.contrib.Calibrator.sample_data (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', '3b8c85ca1e2cf753cc8c90a6c6992958')) paddle.fluid.contrib.Calibrator.sample_data (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', '3b8c85ca1e2cf753cc8c90a6c6992958'))
paddle.fluid.contrib.Calibrator.save_int8_model (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754')) paddle.fluid.contrib.Calibrator.save_int8_model (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
paddle.fluid.contrib.distributed_sampler (ArgSpec(args=['reader', 'batch_size'], varargs=None, keywords=None, defaults=None), ('document', '9a271cd9700deb6d837ed724ba094315')) paddle.fluid.contrib.distributed_batch_reader (ArgSpec(args=['batch_reader'], varargs=None, keywords=None, defaults=None), ('document', 'b60796eb0a481484dd34e345f0eaa4d5'))
paddle.fluid.contrib.reader.ctr_reader.ctr_reader (ArgSpec(args=['feed_dict', 'file_type', 'file_format', 'dense_slot_index', 'sparse_slot_index', 'capacity', 'thread_num', 'batch_size', 'file_list', 'slots', 'name'], varargs=None, keywords=None, defaults=(None,)), ('document', 'b2ebf3de2a6ef1af2c3b88d2db7591ab')) paddle.fluid.contrib.reader.ctr_reader.ctr_reader (ArgSpec(args=['feed_dict', 'file_type', 'file_format', 'dense_slot_index', 'sparse_slot_index', 'capacity', 'thread_num', 'batch_size', 'file_list', 'slots', 'name'], varargs=None, keywords=None, defaults=(None,)), ('document', 'b2ebf3de2a6ef1af2c3b88d2db7591ab'))
paddle.fluid.contrib.Compressor.__init__ (ArgSpec(args=['self', 'place', 'scope', 'train_program', 'train_reader', 'train_feed_list', 'train_fetch_list', 'eval_program', 'eval_reader', 'eval_feed_list', 'eval_fetch_list', 'teacher_programs', 'checkpoint_path', 'train_optimizer', 'distiller_optimizer', 'search_space'], varargs=None, keywords=None, defaults=(None, None, None, None, None, None, None, [], None, None, None, None)), ('document', 'c195b3bba26169cff9439e8c467557c0')) paddle.fluid.contrib.Compressor.__init__ (ArgSpec(args=['self', 'place', 'scope', 'train_program', 'train_reader', 'train_feed_list', 'train_fetch_list', 'eval_program', 'eval_reader', 'eval_feed_list', 'eval_fetch_list', 'teacher_programs', 'checkpoint_path', 'train_optimizer', 'distiller_optimizer', 'search_space'], varargs=None, keywords=None, defaults=(None, None, None, None, None, None, None, [], None, None, None, None)), ('document', 'c195b3bba26169cff9439e8c467557c0'))
paddle.fluid.contrib.Compressor.config (ArgSpec(args=['self', 'config_file'], varargs=None, keywords=None, defaults=None), ('document', '780d9c007276ccbb95b292400d7807b0')) paddle.fluid.contrib.Compressor.config (ArgSpec(args=['self', 'config_file'], varargs=None, keywords=None, defaults=None), ('document', '780d9c007276ccbb95b292400d7807b0'))
......
...@@ -16,10 +16,10 @@ and two types of data format: ...@@ -16,10 +16,10 @@ and two types of data format:
## Distributed reader ## Distributed reader
The distributed reader is mainly used by multi-process tasks, it splits the origin batch samples to N sub-batch samples, and the N is equal to the number of processes. The usage is similar to `paddle.batch`. The distributed reader is mainly used by multi-process tasks, and the input must be a batch reader.
Cons: Cons:
- It can be operated conveniently so that different processes can read different data. - It can be operated conveniently so that different processes can read different data.
Pros: Pros:
- Because each process reads the original batch data and then divides the data, the performance may be poor. - If batch_reader produces training data, and batch_reader loads or preprocesses data for a long time, this data reading method may be slower.
...@@ -15,52 +15,51 @@ ...@@ -15,52 +15,51 @@
from __future__ import print_function from __future__ import print_function
import os import os
__all__ = ["distributed_sampler"] __all__ = ["distributed_batch_reader"]
def distributed_sampler(reader, batch_size): def distributed_batch_reader(batch_reader):
""" """
Create a distributed reader. Create a reader for multi-process training. The input must be a batch reader.
:param reader: the data reader to read from. Args:
:type reader: callable batch_reader (callable): The input reader should be a batch reader.
:param batch_size: the size of the batch
:type batch_size: int
"""
def batch_reader():
if not os.getenv('PADDLE_TRAINER_ID'):
raise RuntimeError(
"The current program is not in distributed mode.")
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0")) Examples:
trainer_count = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
def _slice_data(size): .. code-block:: python
per_node_lines = size // trainer_count import paddle
return [ import paddle.fluid as fluid
trainer_id * per_node_lines, (trainer_id + 1) * per_node_lines
]
r = reader() train_reader = paddle.batch(paddle.dataset.mnist.train(),
b = [] batch_size=32,drop_last=True)
train_reader = fluid.contrib.reader.distributed_batch_reader(
train_reader)
for instance in r: """
b.append(instance) trainers_num = int(os.environ.get('PADDLE_TRAINERS_NUM', 1))
if len(b) == batch_size: trainer_id = int(os.getenv("PADDLE_TRAINER_ID", 0))
if len(b) >= trainer_count: assert trainer_id < trainers_num
begin, end = _slice_data(len(b))
yield b[begin:end]
b = []
if len(b) >= trainer_count: def decorate_for_multi_process():
begin, end = _slice_data(len(b)) if trainers_num > 1:
yield b[begin:end] print("start data reader (trainers_num: {}, trainer_id: {})".format(
trainers_num, trainer_id))
# Batch size check train_data, idx = None, 1
batch_size = int(batch_size) for batch_id, data in enumerate(batch_reader()):
if batch_size <= 0: if trainers_num > 1:
raise ValueError("batch_size should be a positive integeral value, " if idx < trainers_num:
"but got batch_size={}".format(batch_size)) if idx == trainer_id + 1:
train_data = data
idx += 1
else:
if idx == trainer_id + 1:
train_data = data
assert train_data is not None, "train data should not be None."
yield train_data
train_data, idx = None, 1
else:
yield data
return batch_reader return decorate_for_multi_process
...@@ -20,26 +20,21 @@ import paddle.fluid as fluid ...@@ -20,26 +20,21 @@ import paddle.fluid as fluid
import os import os
def data_generator(input_shape=(1, 32, 32), label_range=9): def data_generator():
while True: data = [0, 1, 2, 3]
img = np.random.random(size=input_shape).astype(np.float32) for val in data:
label = np.array(np.random.randint(0, label_range)).astype("int64") yield val
yield img, label
class TestDistributedReader(unittest.TestCase): class TestDistributedReader(unittest.TestCase):
def test_distributed_reader(self): def test_distributed_reader(self):
batch_size = 32
trainer_num = 4 trainer_num = 4
os.environ['PADDLE_TRAINER_ID'] = str(0) os.environ['PADDLE_TRAINER_ID'] = str(1)
os.environ['PADDLE_TRAINERS_NUM'] = str(trainer_num) os.environ['PADDLE_TRAINERS_NUM'] = str(trainer_num)
reader = fluid.contrib.reader.distributed_sampler( reader = fluid.contrib.reader.distributed_batch_reader(data_generator)
data_generator, batch_size=batch_size)
data = next(reader()) data = next(reader())
assert len(data) == batch_size // trainer_num,\ assert data == 1
"sub batch size should be {}, but the returned size is {}".format(
batch_size // trainer_num, len(data))
os.unsetenv('PADDLE_TRAINER_ID') os.unsetenv('PADDLE_TRAINER_ID')
os.unsetenv('PADDLE_TRAINERS_NUM') os.unsetenv('PADDLE_TRAINERS_NUM')
......
...@@ -198,7 +198,7 @@ class TestMultiProcessReader(unittest.TestCase): ...@@ -198,7 +198,7 @@ class TestMultiProcessReader(unittest.TestCase):
results.append(data) results.append(data)
self.assertEqual(sorted(self.samples), sorted(results)) self.assertEqual(sorted(self.samples), sorted(results))
def test_multi_process_reader(self): def test_distributed_batch_reader(self):
self.reader_test(use_pipe=False) self.reader_test(use_pipe=False)
self.reader_test(use_pipe=True) self.reader_test(use_pipe=True)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册