未验证 提交 5588b923 编写于 作者: C chengduo 提交者: GitHub

Add multi process reader (#18115)

* add multi process reader
test=develop
上级 a9dc534f
......@@ -406,7 +406,7 @@ paddle.fluid.contrib.QuantizeTranspiler.training_transpile (ArgSpec(args=['self'
paddle.fluid.contrib.Calibrator.__init__ (ArgSpec(args=['self'], varargs='args', keywords='kwargs', defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
paddle.fluid.contrib.Calibrator.sample_data (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', '3b8c85ca1e2cf753cc8c90a6c6992958'))
paddle.fluid.contrib.Calibrator.save_int8_model (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
paddle.fluid.contrib.distributed_sampler (ArgSpec(args=['reader', 'batch_size'], varargs=None, keywords=None, defaults=None), ('document', '9a271cd9700deb6d837ed724ba094315'))
paddle.fluid.contrib.distributed_batch_reader (ArgSpec(args=['batch_reader'], varargs=None, keywords=None, defaults=None), ('document', 'b60796eb0a481484dd34e345f0eaa4d5'))
paddle.fluid.contrib.reader.ctr_reader.ctr_reader (ArgSpec(args=['feed_dict', 'file_type', 'file_format', 'dense_slot_index', 'sparse_slot_index', 'capacity', 'thread_num', 'batch_size', 'file_list', 'slots', 'name'], varargs=None, keywords=None, defaults=(None,)), ('document', 'b2ebf3de2a6ef1af2c3b88d2db7591ab'))
paddle.fluid.contrib.Compressor.__init__ (ArgSpec(args=['self', 'place', 'scope', 'train_program', 'train_reader', 'train_feed_list', 'train_fetch_list', 'eval_program', 'eval_reader', 'eval_feed_list', 'eval_fetch_list', 'teacher_programs', 'checkpoint_path', 'train_optimizer', 'distiller_optimizer', 'search_space'], varargs=None, keywords=None, defaults=(None, None, None, None, None, None, None, [], None, None, None, None)), ('document', 'c195b3bba26169cff9439e8c467557c0'))
paddle.fluid.contrib.Compressor.config (ArgSpec(args=['self', 'config_file'], varargs=None, keywords=None, defaults=None), ('document', '780d9c007276ccbb95b292400d7807b0'))
......
......@@ -16,10 +16,10 @@ and two types of data format:
## Distributed reader
The distributed reader is mainly used by multi-process tasks, it splits the origin batch samples to N sub-batch samples, and the N is equal to the number of processes. The usage is similar to `paddle.batch`.
The distributed reader is mainly used by multi-process tasks, and the input must be a batch reader.
Cons:
- It can be operated conveniently so that different processes can read different data.
Pros:
- Because each process reads the original batch data and then divides the data, the performance may be poor.
- If batch_reader produces training data, and batch_reader loads or preprocesses data for a long time, this data reading method may be slower.
......@@ -15,52 +15,51 @@
from __future__ import print_function
import os
__all__ = ["distributed_sampler"]
__all__ = ["distributed_batch_reader"]
def distributed_sampler(reader, batch_size):
def distributed_batch_reader(batch_reader):
"""
Create a distributed reader.
Create a reader for multi-process training. The input must be a batch reader.
:param reader: the data reader to read from.
:type reader: callable
:param batch_size: the size of the batch
:type batch_size: int
"""
def batch_reader():
if not os.getenv('PADDLE_TRAINER_ID'):
raise RuntimeError(
"The current program is not in distributed mode.")
Args:
batch_reader (callable): The input reader should be a batch reader.
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
trainer_count = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
Examples:
def _slice_data(size):
per_node_lines = size // trainer_count
return [
trainer_id * per_node_lines, (trainer_id + 1) * per_node_lines
]
.. code-block:: python
import paddle
import paddle.fluid as fluid
r = reader()
b = []
train_reader = paddle.batch(paddle.dataset.mnist.train(),
batch_size=32,drop_last=True)
train_reader = fluid.contrib.reader.distributed_batch_reader(
train_reader)
for instance in r:
b.append(instance)
if len(b) == batch_size:
if len(b) >= trainer_count:
begin, end = _slice_data(len(b))
yield b[begin:end]
b = []
"""
trainers_num = int(os.environ.get('PADDLE_TRAINERS_NUM', 1))
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", 0))
assert trainer_id < trainers_num
if len(b) >= trainer_count:
begin, end = _slice_data(len(b))
yield b[begin:end]
def decorate_for_multi_process():
if trainers_num > 1:
print("start data reader (trainers_num: {}, trainer_id: {})".format(
trainers_num, trainer_id))
# Batch size check
batch_size = int(batch_size)
if batch_size <= 0:
raise ValueError("batch_size should be a positive integeral value, "
"but got batch_size={}".format(batch_size))
train_data, idx = None, 1
for batch_id, data in enumerate(batch_reader()):
if trainers_num > 1:
if idx < trainers_num:
if idx == trainer_id + 1:
train_data = data
idx += 1
else:
if idx == trainer_id + 1:
train_data = data
assert train_data is not None, "train data should not be None."
yield train_data
train_data, idx = None, 1
else:
yield data
return batch_reader
return decorate_for_multi_process
......@@ -20,26 +20,21 @@ import paddle.fluid as fluid
import os
def data_generator(input_shape=(1, 32, 32), label_range=9):
while True:
img = np.random.random(size=input_shape).astype(np.float32)
label = np.array(np.random.randint(0, label_range)).astype("int64")
yield img, label
def data_generator():
data = [0, 1, 2, 3]
for val in data:
yield val
class TestDistributedReader(unittest.TestCase):
def test_distributed_reader(self):
batch_size = 32
trainer_num = 4
os.environ['PADDLE_TRAINER_ID'] = str(0)
os.environ['PADDLE_TRAINER_ID'] = str(1)
os.environ['PADDLE_TRAINERS_NUM'] = str(trainer_num)
reader = fluid.contrib.reader.distributed_sampler(
data_generator, batch_size=batch_size)
reader = fluid.contrib.reader.distributed_batch_reader(data_generator)
data = next(reader())
assert len(data) == batch_size // trainer_num,\
"sub batch size should be {}, but the returned size is {}".format(
batch_size // trainer_num, len(data))
assert data == 1
os.unsetenv('PADDLE_TRAINER_ID')
os.unsetenv('PADDLE_TRAINERS_NUM')
......
......@@ -198,7 +198,7 @@ class TestMultiProcessReader(unittest.TestCase):
results.append(data)
self.assertEqual(sorted(self.samples), sorted(results))
def test_multi_process_reader(self):
def test_distributed_batch_reader(self):
self.reader_test(use_pipe=False)
self.reader_test(use_pipe=True)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册