未验证 提交 93222161 编写于 作者: C chengduo 提交者: GitHub

Add data distributed_sampler (#17573)

* add data parallel batch
上级 1670db5e
......@@ -400,6 +400,7 @@ paddle.fluid.contrib.QuantizeTranspiler.training_transpile (ArgSpec(args=['self'
paddle.fluid.contrib.Calibrator.__init__ (ArgSpec(args=['self'], varargs='args', keywords='kwargs', defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
paddle.fluid.contrib.Calibrator.sample_data (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', '3b8c85ca1e2cf753cc8c90a6c6992958'))
paddle.fluid.contrib.Calibrator.save_int8_model (ArgSpec(args=['self'], varargs=None, keywords=None, defaults=None), ('document', '6adf97f83acf6453d4a6a4b1070f3754'))
paddle.fluid.contrib.distributed_sampler (ArgSpec(args=['reader', 'batch_size'], varargs=None, keywords=None, defaults=None), ('document', '9a271cd9700deb6d837ed724ba094315'))
paddle.fluid.contrib.reader.ctr_reader.ctr_reader (ArgSpec(args=['feed_dict', 'file_type', 'file_format', 'dense_slot_index', 'sparse_slot_index', 'capacity', 'thread_num', 'batch_size', 'file_list', 'slots', 'name'], varargs=None, keywords=None, defaults=(None,)), ('document', 'b2ebf3de2a6ef1af2c3b88d2db7591ab'))
paddle.fluid.contrib.Compressor.__init__ (ArgSpec(args=['self', 'place', 'scope', 'train_program', 'train_reader', 'train_feed_list', 'train_fetch_list', 'eval_program', 'eval_reader', 'eval_feed_list', 'eval_fetch_list', 'teacher_programs', 'checkpoint_path', 'train_optimizer', 'distiller_optimizer'], varargs=None, keywords=None, defaults=(None, None, None, None, None, None, None, [], './checkpoints', None, None)), ('document', '31ae143830c9bf6b43547dd546c5ba80'))
paddle.fluid.contrib.Compressor.config (ArgSpec(args=['self', 'config_file'], varargs=None, keywords=None, defaults=None), ('document', '780d9c007276ccbb95b292400d7807b0'))
......
......@@ -13,3 +13,13 @@ and two types of data format:
* label dense_fea,dense_fea sparse_fea,sparse_fea
- the svm data format is :
* label slot1:fea_sign slot2:fea_sign slot1:fea_sign
## Distributed reader
The distributed reader is mainly used by multi-process tasks, it splits the origin batch samples to N sub-batch samples, and the N is equal to the number of processes. The usage is similar to `paddle.batch`.
Cons:
- It can be operated conveniently so that different processes can read different data.
Pros:
- Because each process reads the original batch data and then divides the data, the performance may be poor.
......@@ -15,5 +15,8 @@
from __future__ import print_function
from . import ctr_reader
from .distributed_reader import *
__all__ = ctr_reader.__all__
__all__ = []
__all__ += distributed_reader.__all__
__all__ += ctr_reader.__all__
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import os
__all__ = ["distributed_sampler"]
def distributed_sampler(reader, batch_size):
"""
Create a distributed reader.
:param reader: the data reader to read from.
:type reader: callable
:param batch_size: the size of the batch
:type batch_size: int
"""
def batch_reader():
if not os.getenv('PADDLE_TRAINER_ID'):
raise RuntimeError(
"The current program is not in distributed mode.")
trainer_id = int(os.getenv("PADDLE_TRAINER_ID", "0"))
trainer_count = int(os.getenv("PADDLE_TRAINERS_NUM", "1"))
def _slice_data(size):
per_node_lines = size // trainer_count
return [
trainer_id * per_node_lines, (trainer_id + 1) * per_node_lines
]
r = reader()
b = []
for instance in r:
b.append(instance)
if len(b) == batch_size:
if len(b) >= trainer_count:
begin, end = _slice_data(len(b))
yield b[begin:end]
b = []
if len(b) >= trainer_count:
begin, end = _slice_data(len(b))
yield b[begin:end]
# Batch size check
batch_size = int(batch_size)
if batch_size <= 0:
raise ValueError("batch_size should be a positive integeral value, "
"but got batch_size={}".format(batch_size))
return batch_reader
# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import unittest
import numpy as np
import paddle.fluid as fluid
import os
def data_generator(input_shape=(1, 32, 32), label_range=9):
while True:
img = np.random.random(size=input_shape).astype(np.float32)
label = np.array(np.random.randint(0, label_range)).astype("int64")
yield img, label
class TestDistributedReader(unittest.TestCase):
def test_distributed_reader(self):
batch_size = 32
trainer_num = 4
os.environ['PADDLE_TRAINER_ID'] = str(0)
os.environ['PADDLE_TRAINERS_NUM'] = str(trainer_num)
reader = fluid.contrib.reader.distributed_sampler(
data_generator, batch_size=batch_size)
data = next(reader())
assert len(data) == batch_size // trainer_num,\
"sub batch size should be {}, but the returned size is {}".format(
batch_size // trainer_num, len(data))
os.unsetenv('PADDLE_TRAINER_ID')
os.unsetenv('PADDLE_TRAINERS_NUM')
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册