diff --git a/paddle/fluid/framework/details/all_reduce_op_handle.cc b/paddle/fluid/framework/details/all_reduce_op_handle.cc index aafb5bf07d9af92775f249c5f8e69330736c9696..74801e2a0d338048add64d155ca8bcb2bd404cc3 100644 --- a/paddle/fluid/framework/details/all_reduce_op_handle.cc +++ b/paddle/fluid/framework/details/all_reduce_op_handle.cc @@ -131,7 +131,7 @@ void AllReduceOpHandle::RunImpl() { } } -std::string AllReduceOpHandle::Name() const { return "nccl_all_reduce"; } +std::string AllReduceOpHandle::Name() const { return "all_reduce"; } } // namespace details } // namespace framework } // namespace paddle diff --git a/python/paddle/dataset/flowers.py b/python/paddle/dataset/flowers.py index 0d1eaeed0971e514fc4368e2f58ba844bd8118ae..527044b415533cc640e3cfc5837c08ab0f8b74b1 100644 --- a/python/paddle/dataset/flowers.py +++ b/python/paddle/dataset/flowers.py @@ -119,7 +119,8 @@ def reader_creator(data_file, yield sample, int(label) - 1 if use_xmap: - return xmap_readers(mapper, reader, min(4, cpu_count()), buffered_size) + cpu_num = int(os.environ.get('CPU_NUM', cpu_count())) + return xmap_readers(mapper, reader, cpu_num, buffered_size) else: return map_readers(mapper, reader) diff --git a/python/paddle/fluid/data_feeder.py b/python/paddle/fluid/data_feeder.py index 0fd696510e5dcf57b95e92e430feb27a8aedd3f8..e2013137b14f73bb0fcfb57b4bdc35fcc043bdc0 100644 --- a/python/paddle/fluid/data_feeder.py +++ b/python/paddle/fluid/data_feeder.py @@ -15,6 +15,7 @@ from __future__ import print_function import core import numpy +import os import six.moves as six import multiprocessing @@ -150,7 +151,9 @@ class DataFeeder(object): elif isinstance(self.place, core.CUDAPlace): return core.get_cuda_device_count() else: - return min(4, multiprocessing.cpu_count()) + cpu_num = int( + os.environ.get('CPU_NUM', multiprocessing.cpu_count())) + return cpu_num def decorate_reader(self, reader, diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index 70437399d72c25043d1ed929c1dad1190d55cb83..d5445f0e17e87464bfc41635b1f565fa7a9ec64d 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -18,6 +18,7 @@ import framework import executor import warnings import sys +import os __all__ = ['ParallelExecutor', 'ExecutionStrategy', 'BuildStrategy'] @@ -101,7 +102,9 @@ class ParallelExecutor(object): p.set_place(self._act_places[-1]) self._places.append(p) else: - for i in xrange(min(4, multiprocessing.cpu_count())): + cpu_num = int( + os.environ.get('CPU_NUM', multiprocessing.cpu_count())) + for i in xrange(cpu_num): p = core.Place() self._act_places.append(core.CPUPlace()) p.set_place(self._act_places[-1]) @@ -118,8 +121,9 @@ class ParallelExecutor(object): # performance. Worth tunning for other models in the future. exec_strategy.num_threads = len(self._places) * 2 else: - exec_strategy.num_threads = min( - len(self._places) * 2, multiprocessing.cpu_count()) + cpu_num = int( + os.environ.get('CPU_NUM', multiprocessing.cpu_count())) + exec_strategy.num_threads = min(len(self._places) * 2, cpu_num) if build_strategy is None: build_strategy = BuildStrategy() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py index 5c26cd4894ce90d837260972a8380ee8f383c37d..3814e035cfcb75a87cde3dff886d03d0655dadc6 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py @@ -18,6 +18,7 @@ import paddle.fluid as fluid import unittest import numpy as np import paddle +import os def Lenet(data, class_dim): @@ -89,6 +90,7 @@ class TestFetchOp(unittest.TestCase): train_inputs.append(tst_reader_iter.next()) self.parallel_exe(train_inputs, seed=1, use_cuda=True) + os.environ['CPU_NUM'] = str(4) self.parallel_exe(train_inputs, seed=1, use_cuda=False) @@ -133,6 +135,7 @@ class TestFeedParallel(unittest.TestCase): def test_feed_op(self): self.parallel_exe(use_cuda=True, seed=1) + os.environ['CPU_NUM'] = str(4) self.parallel_exe(use_cuda=False, seed=1) diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py index 3bc846d125e3bc749ed44384771472c9bb1271eb..9b11a69912e6f6d7deffcc67502ce4b161c44ea3 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py @@ -18,6 +18,7 @@ import numpy as np import paddle import paddle.dataset.mnist as mnist import unittest +import os MNIST_RECORDIO_FILE = "./mnist_test_pe.recordio" @@ -85,6 +86,7 @@ def fc_with_batchnorm(use_feed): class TestMNIST(TestParallelExecutorBase): @classmethod def setUpClass(cls): + os.environ['CPU_NUM'] = str(4) # Convert mnist to recordio file with fluid.program_guard(fluid.Program(), fluid.Program()): reader = paddle.batch(mnist.train(), batch_size=4) diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py index ef6f3b99beff92e988d0f72e0eb55fa0b1569bc6..e3e497c899f9e6ee4c2218f3414ba2e1bc89123c 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py @@ -15,6 +15,7 @@ import paddle.fluid as fluid from parallel_executor_test_base import TestParallelExecutorBase import unittest +import os def squeeze_excitation(input, num_channels, reduction_ratio): @@ -145,10 +146,12 @@ class TestResnet(TestParallelExecutorBase): ) def test_resnet(self): + # os.environ['CPU_NUM'] = str(4) self.check_resnet_convergence(False, use_cuda=True) # self.check_resnet_convergence(False,use_cuda=False) def test_resnet_with_new_strategy(self): + os.environ['CPU_NUM'] = str(4) self.check_resnet_convergence(True, use_cuda=True) self.check_resnet_convergence(True, use_cuda=False) diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_test_while_train.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_test_while_train.py index 1f2555d972f5bce11fb0be4b3f7fc364c910b83a..31ba8c1d6096c9c89e0695c8eca8e16a5e303a61 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_test_while_train.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_test_while_train.py @@ -15,6 +15,7 @@ import paddle.fluid as fluid import numpy as np import unittest +import os def simple_fc_net(): @@ -36,6 +37,7 @@ def simple_fc_net(): class ParallelExecutorTestingDuringTraining(unittest.TestCase): def check_network_convergence(self, use_cuda, build_strategy=None): + os.environ['CPU_NUM'] = str(4) main = fluid.Program() startup = fluid.Program() with fluid.program_guard(main, startup): diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_transformer.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_transformer.py index 3e2c37191feab9a0d0f9c91561be8b978d574792..b6215fddb11bb6b3a76b5a6395e7254d21971c13 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_transformer.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_transformer.py @@ -19,6 +19,7 @@ from parallel_executor_test_base import TestParallelExecutorBase import unittest import paddle import paddle.dataset.wmt16 as wmt16 +import os WMT16_RECORDIO_FILE = "./wmt16_test_pe.recordio" @@ -149,6 +150,7 @@ def transformer(use_feed): class TestTransformer(TestParallelExecutorBase): @classmethod def setUpClass(cls): + os.environ['CPU_NUM'] = str(4) reader = paddle.batch( wmt16.train(ModelHyperParams.src_vocab_size, ModelHyperParams.trg_vocab_size), diff --git a/python/paddle/v2/dataset/flowers.py b/python/paddle/v2/dataset/flowers.py index 6c2c0d5cc50dad04123bf864488457fc91036ffc..357a4e9b000ea81afe291ff39dde2bed5c67e619 100644 --- a/python/paddle/v2/dataset/flowers.py +++ b/python/paddle/v2/dataset/flowers.py @@ -119,7 +119,8 @@ def reader_creator(data_file, yield sample, int(label) - 1 if use_xmap: - return xmap_readers(mapper, reader, min(4, cpu_count()), buffered_size) + cpu_num = int(os.environ.get('CPU_NUM', cpu_count())) + return xmap_readers(mapper, reader, cpu_num, buffered_size) else: return map_readers(mapper, reader)