提交 495368c2 编写于 作者: C chengduoZH

ADD CPU_NUM

上级 d09fd1f6
...@@ -131,7 +131,7 @@ void AllReduceOpHandle::RunImpl() { ...@@ -131,7 +131,7 @@ void AllReduceOpHandle::RunImpl() {
} }
} }
std::string AllReduceOpHandle::Name() const { return "nccl_all_reduce"; } std::string AllReduceOpHandle::Name() const { return "all_reduce"; }
} // namespace details } // namespace details
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
...@@ -119,7 +119,8 @@ def reader_creator(data_file, ...@@ -119,7 +119,8 @@ def reader_creator(data_file,
yield sample, int(label) - 1 yield sample, int(label) - 1
if use_xmap: if use_xmap:
return xmap_readers(mapper, reader, min(4, cpu_count()), buffered_size) cpu_num = int(os.environ.get('CPU_NUM', cpu_count()))
return xmap_readers(mapper, reader, cpu_num, buffered_size)
else: else:
return map_readers(mapper, reader) return map_readers(mapper, reader)
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
from __future__ import print_function from __future__ import print_function
import core import core
import numpy import numpy
import os
import six.moves as six import six.moves as six
import multiprocessing import multiprocessing
...@@ -150,7 +151,9 @@ class DataFeeder(object): ...@@ -150,7 +151,9 @@ class DataFeeder(object):
elif isinstance(self.place, core.CUDAPlace): elif isinstance(self.place, core.CUDAPlace):
return core.get_cuda_device_count() return core.get_cuda_device_count()
else: else:
return min(4, multiprocessing.cpu_count()) cpu_num = int(
os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
return cpu_num
def decorate_reader(self, def decorate_reader(self,
reader, reader,
......
...@@ -18,6 +18,7 @@ import framework ...@@ -18,6 +18,7 @@ import framework
import executor import executor
import warnings import warnings
import sys import sys
import os
__all__ = ['ParallelExecutor', 'ExecutionStrategy', 'BuildStrategy'] __all__ = ['ParallelExecutor', 'ExecutionStrategy', 'BuildStrategy']
...@@ -101,7 +102,9 @@ class ParallelExecutor(object): ...@@ -101,7 +102,9 @@ class ParallelExecutor(object):
p.set_place(self._act_places[-1]) p.set_place(self._act_places[-1])
self._places.append(p) self._places.append(p)
else: else:
for i in xrange(min(4, multiprocessing.cpu_count())): cpu_num = int(
os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
for i in xrange(cpu_num):
p = core.Place() p = core.Place()
self._act_places.append(core.CPUPlace()) self._act_places.append(core.CPUPlace())
p.set_place(self._act_places[-1]) p.set_place(self._act_places[-1])
...@@ -118,8 +121,9 @@ class ParallelExecutor(object): ...@@ -118,8 +121,9 @@ class ParallelExecutor(object):
# performance. Worth tunning for other models in the future. # performance. Worth tunning for other models in the future.
exec_strategy.num_threads = len(self._places) * 2 exec_strategy.num_threads = len(self._places) * 2
else: else:
exec_strategy.num_threads = min( cpu_num = int(
len(self._places) * 2, multiprocessing.cpu_count()) os.environ.get('CPU_NUM', multiprocessing.cpu_count()))
exec_strategy.num_threads = min(len(self._places) * 2, cpu_num)
if build_strategy is None: if build_strategy is None:
build_strategy = BuildStrategy() build_strategy = BuildStrategy()
......
...@@ -18,6 +18,7 @@ import paddle.fluid as fluid ...@@ -18,6 +18,7 @@ import paddle.fluid as fluid
import unittest import unittest
import numpy as np import numpy as np
import paddle import paddle
import os
def Lenet(data, class_dim): def Lenet(data, class_dim):
...@@ -89,6 +90,7 @@ class TestFetchOp(unittest.TestCase): ...@@ -89,6 +90,7 @@ class TestFetchOp(unittest.TestCase):
train_inputs.append(tst_reader_iter.next()) train_inputs.append(tst_reader_iter.next())
self.parallel_exe(train_inputs, seed=1, use_cuda=True) self.parallel_exe(train_inputs, seed=1, use_cuda=True)
os.environ['CPU_NUM'] = str(4)
self.parallel_exe(train_inputs, seed=1, use_cuda=False) self.parallel_exe(train_inputs, seed=1, use_cuda=False)
...@@ -133,6 +135,7 @@ class TestFeedParallel(unittest.TestCase): ...@@ -133,6 +135,7 @@ class TestFeedParallel(unittest.TestCase):
def test_feed_op(self): def test_feed_op(self):
self.parallel_exe(use_cuda=True, seed=1) self.parallel_exe(use_cuda=True, seed=1)
os.environ['CPU_NUM'] = str(4)
self.parallel_exe(use_cuda=False, seed=1) self.parallel_exe(use_cuda=False, seed=1)
......
...@@ -18,6 +18,7 @@ import numpy as np ...@@ -18,6 +18,7 @@ import numpy as np
import paddle import paddle
import paddle.dataset.mnist as mnist import paddle.dataset.mnist as mnist
import unittest import unittest
import os
MNIST_RECORDIO_FILE = "./mnist_test_pe.recordio" MNIST_RECORDIO_FILE = "./mnist_test_pe.recordio"
...@@ -85,6 +86,7 @@ def fc_with_batchnorm(use_feed): ...@@ -85,6 +86,7 @@ def fc_with_batchnorm(use_feed):
class TestMNIST(TestParallelExecutorBase): class TestMNIST(TestParallelExecutorBase):
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
os.environ['CPU_NUM'] = str(4)
# Convert mnist to recordio file # Convert mnist to recordio file
with fluid.program_guard(fluid.Program(), fluid.Program()): with fluid.program_guard(fluid.Program(), fluid.Program()):
reader = paddle.batch(mnist.train(), batch_size=4) reader = paddle.batch(mnist.train(), batch_size=4)
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
import paddle.fluid as fluid import paddle.fluid as fluid
from parallel_executor_test_base import TestParallelExecutorBase from parallel_executor_test_base import TestParallelExecutorBase
import unittest import unittest
import os
def squeeze_excitation(input, num_channels, reduction_ratio): def squeeze_excitation(input, num_channels, reduction_ratio):
...@@ -145,10 +146,12 @@ class TestResnet(TestParallelExecutorBase): ...@@ -145,10 +146,12 @@ class TestResnet(TestParallelExecutorBase):
) )
def test_resnet(self): def test_resnet(self):
# os.environ['CPU_NUM'] = str(4)
self.check_resnet_convergence(False, use_cuda=True) self.check_resnet_convergence(False, use_cuda=True)
# self.check_resnet_convergence(False,use_cuda=False) # self.check_resnet_convergence(False,use_cuda=False)
def test_resnet_with_new_strategy(self): def test_resnet_with_new_strategy(self):
os.environ['CPU_NUM'] = str(4)
self.check_resnet_convergence(True, use_cuda=True) self.check_resnet_convergence(True, use_cuda=True)
self.check_resnet_convergence(True, use_cuda=False) self.check_resnet_convergence(True, use_cuda=False)
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
import paddle.fluid as fluid import paddle.fluid as fluid
import numpy as np import numpy as np
import unittest import unittest
import os
def simple_fc_net(): def simple_fc_net():
...@@ -36,6 +37,7 @@ def simple_fc_net(): ...@@ -36,6 +37,7 @@ def simple_fc_net():
class ParallelExecutorTestingDuringTraining(unittest.TestCase): class ParallelExecutorTestingDuringTraining(unittest.TestCase):
def check_network_convergence(self, use_cuda, build_strategy=None): def check_network_convergence(self, use_cuda, build_strategy=None):
os.environ['CPU_NUM'] = str(4)
main = fluid.Program() main = fluid.Program()
startup = fluid.Program() startup = fluid.Program()
with fluid.program_guard(main, startup): with fluid.program_guard(main, startup):
......
...@@ -19,6 +19,7 @@ from parallel_executor_test_base import TestParallelExecutorBase ...@@ -19,6 +19,7 @@ from parallel_executor_test_base import TestParallelExecutorBase
import unittest import unittest
import paddle import paddle
import paddle.dataset.wmt16 as wmt16 import paddle.dataset.wmt16 as wmt16
import os
WMT16_RECORDIO_FILE = "./wmt16_test_pe.recordio" WMT16_RECORDIO_FILE = "./wmt16_test_pe.recordio"
...@@ -149,6 +150,7 @@ def transformer(use_feed): ...@@ -149,6 +150,7 @@ def transformer(use_feed):
class TestTransformer(TestParallelExecutorBase): class TestTransformer(TestParallelExecutorBase):
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
os.environ['CPU_NUM'] = str(4)
reader = paddle.batch( reader = paddle.batch(
wmt16.train(ModelHyperParams.src_vocab_size, wmt16.train(ModelHyperParams.src_vocab_size,
ModelHyperParams.trg_vocab_size), ModelHyperParams.trg_vocab_size),
......
...@@ -119,7 +119,8 @@ def reader_creator(data_file, ...@@ -119,7 +119,8 @@ def reader_creator(data_file,
yield sample, int(label) - 1 yield sample, int(label) - 1
if use_xmap: if use_xmap:
return xmap_readers(mapper, reader, min(4, cpu_count()), buffered_size) cpu_num = int(os.environ.get('CPU_NUM', cpu_count()))
return xmap_readers(mapper, reader, cpu_num, buffered_size)
else: else:
return map_readers(mapper, reader) return map_readers(mapper, reader)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册