From 65c6d2ef2d674becffea41c990cb6aa0c3f555a2 Mon Sep 17 00:00:00 2001 From: kangguangli Date: Wed, 22 Mar 2023 10:31:14 +0800 Subject: [PATCH] remove DataFeeder.feed_parallel and decorate_reader (#51777) * remove DataFeeder.feed_parallel and decorate_reader * fix CI * fix CI --- python/paddle/fluid/data_feeder.py | 164 ------------------ python/paddle/fluid/reader.py | 7 +- .../fluid/tests/unittests/feed_data_reader.py | 21 +-- .../unittests/ir_memory_optimize_net_base.py | 2 +- .../test_eager_deletion_dynamic_rnn_base.py | 2 +- 5 files changed, 9 insertions(+), 187 deletions(-) diff --git a/python/paddle/fluid/data_feeder.py b/python/paddle/fluid/data_feeder.py index e74e336ab59..e9fcd8ce127 100644 --- a/python/paddle/fluid/data_feeder.py +++ b/python/paddle/fluid/data_feeder.py @@ -451,85 +451,6 @@ class DataFeeder: ret_dict[each_name] = each_converter.done() return ret_dict - def feed_parallel(self, iterable, num_places=None): - """ - Similar with feed function, feed_parallel is used with multiple devices (CPU|GPU). - Here :code:`iterable` is a list of python generators. The data return by each - generator in the list will be fed into a separate device. - - Parameters: - iterable (list|tuple): list of user-defined python generators. The element - number should match the :code:`num_places`. - num_places (int, optional): the number of devices. If not provided (None), - all available devices on the machine will be used. Default None. - - Returns: - :code:`generator`: a :code:`generator` that generate dict which contains (variable name - converted tensor) pairs, - the total number of dicts will be generated matches with the :code:`num_places` - - .. note:: - The number of devices - :code:`num_places` should equal to the generator (element of :code:`iterable` ) number - - Example: - .. code-block:: python - - import numpy as np - import paddle.fluid as fluid - - def generate_reader(batch_size, base=0, factor=1): - def _reader(): - for i in range(batch_size): - yield np.ones([4]) * factor + base, np.ones([4]) * factor + base + 5 - return _reader() - - x = paddle.static.data(name='x', shape=[None, 2, 2]) - y = paddle.static.data(name='y', shape=[None, 2, 2], dtype='float32') - - z = paddle.add(x, y) - - feeder = fluid.DataFeeder(['x','y'], fluid.CPUPlace()) - place_num = 2 - places = [fluid.CPUPlace() for x in range(place_num)] - data = [] - exe = fluid.Executor(fluid.CPUPlace()) - exe.run(fluid.default_startup_program()) - program = fluid.CompiledProgram(fluid.default_main_program()).with_data_parallel(places=places) - - # print sample feed_parallel r result - # for item in list(feeder.feed_parallel([generate_reader(5, 0, 1), generate_reader(3, 10, 2)], 2)): - # print(item['x']) - # print(item['y']) - - reader_list = [generate_reader(5, 0, 1), generate_reader(3, 10, 2)] - res = exe.run(program=program, feed=list(feeder.feed_parallel(reader_list, 2)), fetch_list=[z]) - print(res) - - """ - if isinstance(self.place, core.CUDAPlace): - places = [ - core.CUDAPlace(i) - for i in range(self._get_number_of_places_(num_places)) - ] - else: - places = [ - core.CPUPlace() - for _ in range(self._get_number_of_places_(num_places)) - ] - - if len(iterable) != len(places): - raise ValueError( - "feed_parallel takes multiple mini-batches. Each " - "mini-batch will be feed on each device. The " - "number of devices and number of mini-batches " - "must be same." - ) - - place = self.place - for p, batch in zip(places, iterable): - self.place = p - yield self.feed(batch) - self.place = place - def _get_number_of_places_(self, num_places): if num_places is not None: return int(num_places) @@ -537,88 +458,3 @@ class DataFeeder: return len(_cuda_ids()) else: return _cpu_num() - - def decorate_reader( - self, reader, multi_devices, num_places=None, drop_last=True - ): - """ - Decorate the reader (generator) to fit multiple devices. The reader generate - multiple mini-batches. Each mini-batch will be fed into a single device. - - Parameters: - reader(generator): a user defined python generator used to get :code:`mini-batch` of data. - A :code:`mini-batch` can be regarded as a python generator that returns batches of input - entities, just like the below :code:`_mini_batch` in the code example. - multi_devices(bool): indicate whether to use multiple devices or not. - num_places(int, optional): if :code:`multi_devices` is True, you can specify the number - of devices(CPU|GPU) to use, if multi_devices is None, the function will use all the - devices of the current machine. Default None. - drop_last(bool, optional): whether to drop the last round of data if it is not enough to - feed all devices. Default True. - - Returns: - :code:`generator`: a new :code:`generator` which return converted dicts that can be fed into Executor - - Raises: - :code:`ValueError`: If drop_last is False and the data cannot fit devices perfectly. - - Example: - .. code-block:: python - - import numpy as np - import paddle - import paddle.fluid as fluid - import paddle.fluid.compiler as compiler - - def reader(): - def _mini_batch(batch_size): - for i in range(batch_size): - yield np.random.random([16]).astype('float32'), np.random.randint(10, size=[1]) - - for _ in range(10): - yield _mini_batch(np.random.randint(1, 10)) - - place_num = 3 - places = [fluid.CPUPlace() for _ in range(place_num)] - - # a simple network sample - data = paddle.static.data(name='data', shape=[None, 4, 4], dtype='float32') - label = paddle.static.data(name='label', shape=[None, 1], dtype='int64') - hidden = paddle.static.nn.fc(x=data, size=10) - - feeder = fluid.DataFeeder(place=places[0], feed_list=[data, label]) - reader = feeder.decorate_reader(reader, multi_devices=True, num_places=3, drop_last=True) - - exe = fluid.Executor(places[0]) - exe.run(fluid.default_startup_program()) - compiled_prog = compiler.CompiledProgram( - fluid.default_main_program()).with_data_parallel(places=places) - - for i,data in enumerate(reader()): - # print data if you like - # print(i, data) - ret = exe.run(compiled_prog, feed=data, fetch_list=[hidden]) - print(ret) - - """ - - def __reader_creator__(): - if not multi_devices: - for item in reader(): - yield self.feed(item) - else: - num = self._get_number_of_places_(num_places) - item = [] - for batch in reader(): - item.append(batch) - if len(item) == num: - yield list(self.feed_parallel(item, num)) - item = [] - if not drop_last and len(item) != 0: - raise ValueError( - "The data batch which cannot fit for devices will be " - "dropped is not implementation. Other strategies are " - "not implemented" - ) - - return __reader_creator__ diff --git a/python/paddle/fluid/reader.py b/python/paddle/fluid/reader.py index d36542da09b..d863dd142e6 100644 --- a/python/paddle/fluid/reader.py +++ b/python/paddle/fluid/reader.py @@ -1550,7 +1550,12 @@ class GeneratorLoader(DataLoaderBase): feeder = DataFeeder( feed_list=self._feed_list, place=core.CPUPlace() ) - paddle_reader = feeder.decorate_reader(reader, multi_devices=False) + + def decorate_reader(): + for item in reader(): + yield feeder.feed(item) + + paddle_reader = decorate_reader def __tensor_reader_impl__(): for slots in paddle_reader(): diff --git a/python/paddle/fluid/tests/unittests/feed_data_reader.py b/python/paddle/fluid/tests/unittests/feed_data_reader.py index 355d8f4d0ba..66c034c799a 100644 --- a/python/paddle/fluid/tests/unittests/feed_data_reader.py +++ b/python/paddle/fluid/tests/unittests/feed_data_reader.py @@ -45,25 +45,6 @@ class FeedDataReader: feed_data[key] = value return feed_data - def _feed_parallel_executor(self, device_num): - feed_data = [] - for _ in range(device_num): - feed_data.append(self._feed_executor()) - - return feed_data - def get_next(self, exe, program): - result = [] assert isinstance(exe, fluid.Executor), "exe must be Executor" - use_cuda = isinstance(exe.place, fluid.CUDAPlace) - if isinstance(program, fluid.CompiledProgram): - use_executor = True - device_num = 1 - else: - use_executor = True - device_num = 1 - - if use_executor: - return self._feed_executor() - else: - return self._feed_parallel_executor(device_num) + return self._feed_executor() diff --git a/python/paddle/fluid/tests/unittests/ir_memory_optimize_net_base.py b/python/paddle/fluid/tests/unittests/ir_memory_optimize_net_base.py index 4927b200426..1fd7b2bc89e 100644 --- a/python/paddle/fluid/tests/unittests/ir_memory_optimize_net_base.py +++ b/python/paddle/fluid/tests/unittests/ir_memory_optimize_net_base.py @@ -76,7 +76,7 @@ class BuildIrMemOptBase(unittest.TestCase): # execution place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() feeder = fluid.DataFeeder(feed_list=[data, label], place=place) - reader = feeder.decorate_reader(self.train_reader, multi_devices=True) + reader = feeder.feed(self.train_reader()) exe = fluid.Executor(place) exe.run(fluid.default_startup_program()) diff --git a/python/paddle/fluid/tests/unittests/test_eager_deletion_dynamic_rnn_base.py b/python/paddle/fluid/tests/unittests/test_eager_deletion_dynamic_rnn_base.py index c1fb340a888..581b48183a2 100644 --- a/python/paddle/fluid/tests/unittests/test_eager_deletion_dynamic_rnn_base.py +++ b/python/paddle/fluid/tests/unittests/test_eager_deletion_dynamic_rnn_base.py @@ -47,7 +47,7 @@ def train(network, use_cuda, batch_size=32, pass_num=2): place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() feeder = fluid.DataFeeder(feed_list=[data, label], place=place) - reader = feeder.decorate_reader(train_reader, multi_devices=False) + reader = feeder.feed(train_reader()) exe = fluid.Executor(place) fluid.default_startup_program().random_seed = 1 -- GitLab