未验证 提交 65c6d2ef 编写于 作者: K kangguangli 提交者: GitHub

remove DataFeeder.feed_parallel and decorate_reader (#51777)

* remove DataFeeder.feed_parallel and decorate_reader

* fix CI

* fix CI
上级 720b14e3
......@@ -451,85 +451,6 @@ class DataFeeder:
ret_dict[each_name] = each_converter.done()
return ret_dict
def feed_parallel(self, iterable, num_places=None):
"""
Similar with feed function, feed_parallel is used with multiple devices (CPU|GPU).
Here :code:`iterable` is a list of python generators. The data return by each
generator in the list will be fed into a separate device.
Parameters:
iterable (list|tuple): list of user-defined python generators. The element
number should match the :code:`num_places`.
num_places (int, optional): the number of devices. If not provided (None),
all available devices on the machine will be used. Default None.
Returns:
:code:`generator`: a :code:`generator` that generate dict which contains (variable name - converted tensor) pairs,
the total number of dicts will be generated matches with the :code:`num_places`
.. note::
The number of devices - :code:`num_places` should equal to the generator (element of :code:`iterable` ) number
Example:
.. code-block:: python
import numpy as np
import paddle.fluid as fluid
def generate_reader(batch_size, base=0, factor=1):
def _reader():
for i in range(batch_size):
yield np.ones([4]) * factor + base, np.ones([4]) * factor + base + 5
return _reader()
x = paddle.static.data(name='x', shape=[None, 2, 2])
y = paddle.static.data(name='y', shape=[None, 2, 2], dtype='float32')
z = paddle.add(x, y)
feeder = fluid.DataFeeder(['x','y'], fluid.CPUPlace())
place_num = 2
places = [fluid.CPUPlace() for x in range(place_num)]
data = []
exe = fluid.Executor(fluid.CPUPlace())
exe.run(fluid.default_startup_program())
program = fluid.CompiledProgram(fluid.default_main_program()).with_data_parallel(places=places)
# print sample feed_parallel r result
# for item in list(feeder.feed_parallel([generate_reader(5, 0, 1), generate_reader(3, 10, 2)], 2)):
# print(item['x'])
# print(item['y'])
reader_list = [generate_reader(5, 0, 1), generate_reader(3, 10, 2)]
res = exe.run(program=program, feed=list(feeder.feed_parallel(reader_list, 2)), fetch_list=[z])
print(res)
"""
if isinstance(self.place, core.CUDAPlace):
places = [
core.CUDAPlace(i)
for i in range(self._get_number_of_places_(num_places))
]
else:
places = [
core.CPUPlace()
for _ in range(self._get_number_of_places_(num_places))
]
if len(iterable) != len(places):
raise ValueError(
"feed_parallel takes multiple mini-batches. Each "
"mini-batch will be feed on each device. The "
"number of devices and number of mini-batches "
"must be same."
)
place = self.place
for p, batch in zip(places, iterable):
self.place = p
yield self.feed(batch)
self.place = place
def _get_number_of_places_(self, num_places):
if num_places is not None:
return int(num_places)
......@@ -537,88 +458,3 @@ class DataFeeder:
return len(_cuda_ids())
else:
return _cpu_num()
def decorate_reader(
self, reader, multi_devices, num_places=None, drop_last=True
):
"""
Decorate the reader (generator) to fit multiple devices. The reader generate
multiple mini-batches. Each mini-batch will be fed into a single device.
Parameters:
reader(generator): a user defined python generator used to get :code:`mini-batch` of data.
A :code:`mini-batch` can be regarded as a python generator that returns batches of input
entities, just like the below :code:`_mini_batch` in the code example.
multi_devices(bool): indicate whether to use multiple devices or not.
num_places(int, optional): if :code:`multi_devices` is True, you can specify the number
of devices(CPU|GPU) to use, if multi_devices is None, the function will use all the
devices of the current machine. Default None.
drop_last(bool, optional): whether to drop the last round of data if it is not enough to
feed all devices. Default True.
Returns:
:code:`generator`: a new :code:`generator` which return converted dicts that can be fed into Executor
Raises:
:code:`ValueError`: If drop_last is False and the data cannot fit devices perfectly.
Example:
.. code-block:: python
import numpy as np
import paddle
import paddle.fluid as fluid
import paddle.fluid.compiler as compiler
def reader():
def _mini_batch(batch_size):
for i in range(batch_size):
yield np.random.random([16]).astype('float32'), np.random.randint(10, size=[1])
for _ in range(10):
yield _mini_batch(np.random.randint(1, 10))
place_num = 3
places = [fluid.CPUPlace() for _ in range(place_num)]
# a simple network sample
data = paddle.static.data(name='data', shape=[None, 4, 4], dtype='float32')
label = paddle.static.data(name='label', shape=[None, 1], dtype='int64')
hidden = paddle.static.nn.fc(x=data, size=10)
feeder = fluid.DataFeeder(place=places[0], feed_list=[data, label])
reader = feeder.decorate_reader(reader, multi_devices=True, num_places=3, drop_last=True)
exe = fluid.Executor(places[0])
exe.run(fluid.default_startup_program())
compiled_prog = compiler.CompiledProgram(
fluid.default_main_program()).with_data_parallel(places=places)
for i,data in enumerate(reader()):
# print data if you like
# print(i, data)
ret = exe.run(compiled_prog, feed=data, fetch_list=[hidden])
print(ret)
"""
def __reader_creator__():
if not multi_devices:
for item in reader():
yield self.feed(item)
else:
num = self._get_number_of_places_(num_places)
item = []
for batch in reader():
item.append(batch)
if len(item) == num:
yield list(self.feed_parallel(item, num))
item = []
if not drop_last and len(item) != 0:
raise ValueError(
"The data batch which cannot fit for devices will be "
"dropped is not implementation. Other strategies are "
"not implemented"
)
return __reader_creator__
......@@ -1550,7 +1550,12 @@ class GeneratorLoader(DataLoaderBase):
feeder = DataFeeder(
feed_list=self._feed_list, place=core.CPUPlace()
)
paddle_reader = feeder.decorate_reader(reader, multi_devices=False)
def decorate_reader():
for item in reader():
yield feeder.feed(item)
paddle_reader = decorate_reader
def __tensor_reader_impl__():
for slots in paddle_reader():
......
......@@ -45,25 +45,6 @@ class FeedDataReader:
feed_data[key] = value
return feed_data
def _feed_parallel_executor(self, device_num):
feed_data = []
for _ in range(device_num):
feed_data.append(self._feed_executor())
return feed_data
def get_next(self, exe, program):
result = []
assert isinstance(exe, fluid.Executor), "exe must be Executor"
use_cuda = isinstance(exe.place, fluid.CUDAPlace)
if isinstance(program, fluid.CompiledProgram):
use_executor = True
device_num = 1
else:
use_executor = True
device_num = 1
if use_executor:
return self._feed_executor()
else:
return self._feed_parallel_executor(device_num)
......@@ -76,7 +76,7 @@ class BuildIrMemOptBase(unittest.TestCase):
# execution
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
feeder = fluid.DataFeeder(feed_list=[data, label], place=place)
reader = feeder.decorate_reader(self.train_reader, multi_devices=True)
reader = feeder.feed(self.train_reader())
exe = fluid.Executor(place)
exe.run(fluid.default_startup_program())
......
......@@ -47,7 +47,7 @@ def train(network, use_cuda, batch_size=32, pass_num=2):
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
feeder = fluid.DataFeeder(feed_list=[data, label], place=place)
reader = feeder.decorate_reader(train_reader, multi_devices=False)
reader = feeder.feed(train_reader())
exe = fluid.Executor(place)
fluid.default_startup_program().random_seed = 1
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册