diff --git a/python/paddle/fluid/data_feeder.py b/python/paddle/fluid/data_feeder.py index ac02401c79b787716b2e5f43e0d1c5686cf2bd13..0051b698471b40bffc12921f86dcde642714e07d 100644 --- a/python/paddle/fluid/data_feeder.py +++ b/python/paddle/fluid/data_feeder.py @@ -16,6 +16,7 @@ from __future__ import print_function import core import numpy import six.moves as six +import multiprocessing from framework import Variable, default_main_program @@ -116,3 +117,60 @@ class DataFeeder(object): for each_name, each_converter in six.zip(self.feed_names, converter): ret_dict[each_name] = each_converter.done() return ret_dict + + def feed_parallel(self, iterable, num_places=None): + if isinstance(self.place, core.CUDAPlace): + places = [ + core.CUDAPlace(i) + for i in six.xrange(self._get_number_of_places_(num_places)) + ] + else: + places = [ + core.CPUPlace() + for _ in six.xrange(self._get_number_of_places_(num_places)) + ] + + if len(iterable) != len(places): + raise ValueError("feed_parallel takes multiple mini-batches. Each " + "mini-batch will be feed on each device. The " + "number of devices and number of mini-batches " + "must be same.") + + place = self.place + for p, batch in six.zip(places, iterable): + self.place = p + yield self.feed(batch) + self.place = place + + def _get_number_of_places_(self, num_places): + if num_places is not None: + return int(num_places) + elif isinstance(self.place, core.CUDAPlace): + return core.get_cuda_device_count() + else: + return multiprocessing.cpu_count() + + def decorate_reader(self, + reader, + multi_devices, + num_places=None, + drop_last=True): + def __reader_creator__(): + if not multi_devices: + for item in reader(): + yield self.feed(item) + else: + num = self._get_number_of_places_(num_places) + item = [] + for batch in reader(): + item.append(batch) + if len(item) == num: + yield list(self.feed_parallel(item, num)) + item = [] + if not drop_last and len(item) != 0: + raise ValueError( + "The data batch which cannot fit for devices will be " + "dropped is not implementation. Other strategies are " + "not implemented") + + return __reader_creator__ diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor.py b/python/paddle/fluid/tests/unittests/test_parallel_executor.py index 4eb25a6e00b7564ac17db568ec78c1c84933af43..6db3a13718d0b258b297261f9df6457ad2035e3e 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor.py @@ -796,5 +796,42 @@ class TestFetchOp(unittest.TestCase): self.parallel_exe(train_inputs, seed=1) +class TestFeedParallel(unittest.TestCase): + def test_main(self): + main = fluid.Program() + startup = fluid.Program() + startup.random_seed = 1 + with fluid.scope_guard(fluid.core.Scope()): + with fluid.program_guard(main, startup): + data = fluid.layers.data( + name='image', shape=[3, 224, 224], dtype='float32') + label = fluid.layers.data( + name='label', shape=[1], dtype='int64') + out = Lenet(data, class_dim=102) + loss = fluid.layers.cross_entropy(input=out, label=label) + loss = fluid.layers.mean(loss) + opt = fluid.optimizer.Momentum( + learning_rate=0.1, + momentum=0.9, + regularization=fluid.regularizer.L2Decay(1e-4)) + + opt.minimize(loss) + place = fluid.CUDAPlace(0) + feeder = fluid.DataFeeder(place=place, feed_list=[data, label]) + reader = feeder.decorate_reader( + paddle.batch( + flowers.train(), batch_size=16), multi_devices=True) + exe = fluid.Executor(place) + exe.run(startup) + pe = fluid.ParallelExecutor( + use_cuda=True, loss_name=loss.name, main_program=main) + + for batch_id, data in enumerate(reader()): + loss_np = np.array(pe.run(feed=data, fetch_list=[loss.name])[0]) + print batch_id, loss_np + if batch_id == 2: + break + + if __name__ == '__main__': unittest.main()