提交 a56dcf51 编写于 作者: C chengduoZH

fix parallel_executor.py and xx_mnist.py

上级 3ff9ba0e
...@@ -119,7 +119,7 @@ def reader_creator(data_file, ...@@ -119,7 +119,7 @@ def reader_creator(data_file,
yield sample, int(label) - 1 yield sample, int(label) - 1
if use_xmap: if use_xmap:
return xmap_readers(mapper, reader, cpu_count(), buffered_size) return xmap_readers(mapper, reader, min(4, cpu_count()), buffered_size)
else: else:
return map_readers(mapper, reader) return map_readers(mapper, reader)
......
...@@ -150,7 +150,7 @@ class DataFeeder(object): ...@@ -150,7 +150,7 @@ class DataFeeder(object):
elif isinstance(self.place, core.CUDAPlace): elif isinstance(self.place, core.CUDAPlace):
return core.get_cuda_device_count() return core.get_cuda_device_count()
else: else:
return multiprocessing.cpu_count() return min(4, multiprocessing.cpu_count())
def decorate_reader(self, def decorate_reader(self,
reader, reader,
......
...@@ -101,7 +101,7 @@ class ParallelExecutor(object): ...@@ -101,7 +101,7 @@ class ParallelExecutor(object):
p.set_place(self._act_places[-1]) p.set_place(self._act_places[-1])
self._places.append(p) self._places.append(p)
else: else:
for i in xrange(multiprocessing.cpu_count()): for i in xrange(min(4, multiprocessing.cpu_count())):
p = core.Place() p = core.Place()
self._act_places.append(core.CPUPlace()) self._act_places.append(core.CPUPlace())
p.set_place(self._act_places[-1]) p.set_place(self._act_places[-1])
...@@ -110,10 +110,7 @@ class ParallelExecutor(object): ...@@ -110,10 +110,7 @@ class ParallelExecutor(object):
if exec_strategy is None: if exec_strategy is None:
exec_strategy = ExecutionStrategy() exec_strategy = ExecutionStrategy()
if use_cuda: exec_strategy.use_event = use_cuda
exec_strategy.use_event = True
else:
exec_strategy.use_event = False
if exec_strategy.num_threads == 0: if exec_strategy.num_threads == 0:
if use_cuda: if use_cuda:
......
...@@ -23,6 +23,7 @@ __all__ = ['TestParallelExecutorBase'] ...@@ -23,6 +23,7 @@ __all__ = ['TestParallelExecutorBase']
class TestParallelExecutorBase(unittest.TestCase): class TestParallelExecutorBase(unittest.TestCase):
def check_network_convergence(self, def check_network_convergence(self,
method, method,
use_cuda=True,
memory_opt=True, memory_opt=True,
iter=50, iter=50,
batch_size=None, batch_size=None,
...@@ -53,7 +54,7 @@ class TestParallelExecutorBase(unittest.TestCase): ...@@ -53,7 +54,7 @@ class TestParallelExecutorBase(unittest.TestCase):
adam.minimize(loss) adam.minimize(loss)
if memory_opt: if memory_opt:
fluid.memory_optimize(main) fluid.memory_optimize(main)
place = fluid.CUDAPlace(0) place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
startup_exe = fluid.Executor(place) startup_exe = fluid.Executor(place)
startup_exe.run(startup) startup_exe.run(startup)
exec_strategy = fluid.ExecutionStrategy() exec_strategy = fluid.ExecutionStrategy()
...@@ -64,7 +65,7 @@ class TestParallelExecutorBase(unittest.TestCase): ...@@ -64,7 +65,7 @@ class TestParallelExecutorBase(unittest.TestCase):
if use_parallel_executor: if use_parallel_executor:
exe = fluid.ParallelExecutor( exe = fluid.ParallelExecutor(
True, use_cuda,
loss_name=loss.name, loss_name=loss.name,
exec_strategy=exec_strategy, exec_strategy=exec_strategy,
build_strategy=build_strategy) build_strategy=build_strategy)
......
...@@ -99,7 +99,9 @@ class TestMNIST(TestParallelExecutorBase): ...@@ -99,7 +99,9 @@ class TestMNIST(TestParallelExecutorBase):
fluid.recordio_writer.convert_reader_to_recordio_file( fluid.recordio_writer.convert_reader_to_recordio_file(
MNIST_RECORDIO_FILE, reader, feeder) MNIST_RECORDIO_FILE, reader, feeder)
def check_simple_fc_convergence(self, balance_parameter_opt_between_cards): def check_simple_fc_convergence(self,
balance_parameter_opt_between_cards,
use_cuda=True):
self.check_network_convergence(simple_fc_net) self.check_network_convergence(simple_fc_net)
self.check_network_convergence(simple_fc_net, allow_op_delay=True) self.check_network_convergence(simple_fc_net, allow_op_delay=True)
...@@ -109,17 +111,19 @@ class TestMNIST(TestParallelExecutorBase): ...@@ -109,17 +111,19 @@ class TestMNIST(TestParallelExecutorBase):
simple_fc_net, simple_fc_net,
feed_dict={"image": img, feed_dict={"image": img,
"label": label}, "label": label},
use_cuda=use_cuda,
balance_parameter_opt_between_cards=balance_parameter_opt_between_cards balance_parameter_opt_between_cards=balance_parameter_opt_between_cards
) )
def test_simple_fc(self): def test_simple_fc(self):
self.check_simple_fc_convergence(False) self.check_simple_fc_convergence(False, use_cuda=True)
def test_simple_fc_with_new_strategy(self): def test_simple_fc_with_new_strategy(self):
self.check_simple_fc_convergence(True) self.check_simple_fc_convergence(True, use_cuda=True)
def check_simple_fc_parallel_accuracy(self, def check_simple_fc_parallel_accuracy(self,
balance_parameter_opt_between_cards): balance_parameter_opt_between_cards,
use_cuda=True):
img = np.zeros(shape=[32, 784], dtype='float32') img = np.zeros(shape=[32, 784], dtype='float32')
label = np.ones(shape=[32, 1], dtype='int64') label = np.ones(shape=[32, 1], dtype='int64')
single_first_loss, single_last_loss = self.check_network_convergence( single_first_loss, single_last_loss = self.check_network_convergence(
...@@ -127,6 +131,7 @@ class TestMNIST(TestParallelExecutorBase): ...@@ -127,6 +131,7 @@ class TestMNIST(TestParallelExecutorBase):
seed=1000, seed=1000,
feed_dict={"image": img, feed_dict={"image": img,
"label": label}, "label": label},
use_cuda=use_cuda,
use_parallel_executor=False) use_parallel_executor=False)
parallel_first_loss, parallel_last_loss = self.check_network_convergence( parallel_first_loss, parallel_last_loss = self.check_network_convergence(
method=simple_fc_net, method=simple_fc_net,
...@@ -143,13 +148,15 @@ class TestMNIST(TestParallelExecutorBase): ...@@ -143,13 +148,15 @@ class TestMNIST(TestParallelExecutorBase):
self.assertAlmostEquals(p_l, single_last_loss[0], delta=1e-6) self.assertAlmostEquals(p_l, single_last_loss[0], delta=1e-6)
def test_simple_fc_parallel_accuracy(self): def test_simple_fc_parallel_accuracy(self):
self.check_simple_fc_parallel_accuracy(False) self.check_simple_fc_parallel_accuracy(False, use_cuda=True)
self.check_simple_fc_parallel_accuracy(False, use_cuda=False)
def test_simple_fc_parallel_accuracy_with_new_strategy(self): def test_simple_fc_parallel_accuracy_with_new_strategy(self):
self.check_simple_fc_parallel_accuracy(True) self.check_simple_fc_parallel_accuracy(True, use_cuda=True)
self.check_simple_fc_parallel_accuracy(True, use_cuda=False)
def check_batchnorm_fc_convergence(self, def check_batchnorm_fc_convergence(
balance_parameter_opt_between_cards): self, balance_parameter_opt_between_cards, use_cuda):
self.check_network_convergence(fc_with_batchnorm) self.check_network_convergence(fc_with_batchnorm)
img = np.zeros(shape=[32, 784], dtype='float32') img = np.zeros(shape=[32, 784], dtype='float32')
label = np.ones(shape=[32, 1], dtype='int64') label = np.ones(shape=[32, 1], dtype='int64')
...@@ -157,14 +164,17 @@ class TestMNIST(TestParallelExecutorBase): ...@@ -157,14 +164,17 @@ class TestMNIST(TestParallelExecutorBase):
fc_with_batchnorm, fc_with_batchnorm,
feed_dict={"image": img, feed_dict={"image": img,
"label": label}, "label": label},
use_cuda=use_cuda,
balance_parameter_opt_between_cards=balance_parameter_opt_between_cards balance_parameter_opt_between_cards=balance_parameter_opt_between_cards
) )
def test_batchnorm_fc(self): def test_batchnorm_fc(self):
self.check_batchnorm_fc_convergence(False) self.check_batchnorm_fc_convergence(False, use_cuda=True)
self.check_batchnorm_fc_convergence(False, use_cuda=False)
def test_batchnorm_fc_with_new_strategy(self): def test_batchnorm_fc_with_new_strategy(self):
self.check_batchnorm_fc_convergence(True) self.check_batchnorm_fc_convergence(True, use_cuda=True)
self.check_batchnorm_fc_convergence(True, use_cuda=False)
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -119,7 +119,7 @@ def reader_creator(data_file, ...@@ -119,7 +119,7 @@ def reader_creator(data_file,
yield sample, int(label) - 1 yield sample, int(label) - 1
if use_xmap: if use_xmap:
return xmap_readers(mapper, reader, cpu_count(), buffered_size) return xmap_readers(mapper, reader, min(4, cpu_count()), buffered_size)
else: else:
return map_readers(mapper, reader) return map_readers(mapper, reader)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册