diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index 07cc1e29341bd497e88097a9ee5653631b79d734..fbdd6fd449625a21f91758dc12490b02070aea1a 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -16,6 +16,7 @@ import core import multiprocessing import framework import executor +import warnings import sys __all__ = ['ParallelExecutor'] @@ -62,8 +63,8 @@ class ParallelExecutor(object): main_program=test_program, share_vars_from=train_exe) - train_loss, = train_exe.run([loss.name], feed_dict=feed_dict) - test_loss, = test_exe.run([loss.name], feed_dict=feed_dict) + train_loss, = train_exe.run([loss.name], feed=feed_dict) + test_loss, = test_exe.run([loss.name], feed=feed_dict) """ self._places = [] @@ -103,8 +104,8 @@ class ParallelExecutor(object): self.persistable_vars = [ v.name - for v in filter(lambda var: \ - var.persistable and var.type != core.VarDesc.VarType.RAW, + for v in filter( + lambda var: var.persistable and var.type != core.VarDesc.VarType.RAW, main.list_vars()) ] @@ -163,7 +164,7 @@ class ParallelExecutor(object): Returns: fetched result list. """ - if feed is None: + if feed is None and feed_dict is not None: feed = feed_dict print >> sys.stderr, "`feed_dict` is deprecated. Please use `feed=`" diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor.py b/python/paddle/fluid/tests/unittests/test_parallel_executor.py index 3ddafbbc57b29d506158bcb57188ab96f814e0d3..c783a142467f3f6a9cd210425acfc526a32a6f71 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor.py @@ -200,14 +200,29 @@ class TestParallelExecutorBase(unittest.TestCase): def check_network_convergence(self, method, memory_opt=True, - iter=10, + iter=50, batch_size=None, allow_op_delay=False, - feed_dict=None): + feed_dict=None, + seed=None, + use_parallel_executor=True): + def run_executor(exe, feed, fetch_list, program=None): + if isinstance(exe, fluid.ParallelExecutor): + res = exe.run(fetch_list=fetch_list, feed=feed) + elif isinstance(exe, fluid.Executor): + if program is None: + program = fluid.default_main_program() + res = exe.run(program=program, feed=feed, fetch_list=fetch_list) + else: + raise ValueError('Unkown type exe') + return res + main = fluid.Program() startup = fluid.Program() startup.random_seed = 1 # Fix random seed with fluid.program_guard(main, startup): + if seed is not None: + startup.random_seed = seed loss = method(use_feed=feed_dict is not None) adam = fluid.optimizer.Adam() adam.minimize(loss) @@ -217,18 +232,24 @@ class TestParallelExecutorBase(unittest.TestCase): startup_exe = fluid.Executor(place) startup_exe.run(startup) - exe = fluid.ParallelExecutor( - True, loss_name=loss.name, allow_op_delay=allow_op_delay) + if use_parallel_executor: + exe = fluid.ParallelExecutor( + True, loss_name=loss.name, allow_op_delay=allow_op_delay) + else: + exe = fluid.Executor(place=place) + if batch_size is not None: batch_size *= fluid.core.get_cuda_device_count() begin = time.time() - first_loss, = exe.run([loss.name], feed=feed_dict) + first_loss, = run_executor( + exe=exe, feed=feed_dict, fetch_list=[loss.name]) first_loss = numpy.array(first_loss) for i in xrange(iter): - exe.run([], feed=feed_dict) + run_executor(exe=exe, feed=feed_dict, fetch_list=[]) - last_loss, = exe.run([loss.name], feed=feed_dict) + last_loss, = run_executor( + exe=exe, feed=feed_dict, fetch_list=[loss.name]) end = time.time() if batch_size is not None: @@ -239,6 +260,7 @@ class TestParallelExecutorBase(unittest.TestCase): print first_loss, last_loss # self.assertGreater(first_loss[0], last_loss[0]) + return first_loss, last_loss class TestMNIST(TestParallelExecutorBase): @@ -268,6 +290,27 @@ class TestMNIST(TestParallelExecutorBase): simple_fc_net, feed_dict={"image": img, "label": label}) + def test_simple_fc_parallel_accuracy(self): + img = numpy.zeros(shape=[32, 784], dtype='float32') + label = numpy.ones(shape=[32, 1], dtype='int64') + single_first_loss, single_last_loss = self.check_network_convergence( + method=simple_fc_net, + seed=1000, + feed_dict={"image": img, + "label": label}, + use_parallel_executor=False) + parallel_first_loss, parallel_last_loss = self.check_network_convergence( + method=simple_fc_net, + seed=1000, + feed_dict={"image": img, + "label": label}, + use_parallel_executor=True) + + for p_f in parallel_first_loss: + self.assertAlmostEquals(p_f, single_first_loss[0], delta=1e-6) + for p_l in parallel_last_loss: + self.assertAlmostEquals(p_l, single_last_loss[0], delta=1e-6) + def test_batchnorm_fc(self): self.check_network_convergence(fc_with_batchnorm) img = numpy.zeros(shape=[32, 784], dtype='float32') @@ -496,10 +539,10 @@ class ParallelExecutorTestingDuringTraining(unittest.TestCase): share_vars_from=train_exe) for i in xrange(5): - test_loss, = test_exe.run([loss.name], feed_dict=feed_dict) + test_loss, = test_exe.run([loss.name], feed=feed_dict) test_loss = numpy.array(test_loss) - train_loss, = train_exe.run([loss.name], feed_dict=feed_dict) + train_loss, = train_exe.run([loss.name], feed=feed_dict) train_loss = numpy.array(train_loss) self.assertTrue( numpy.allclose(