From 495368c2436cfa0bc8f813e821a317c48505ea14 Mon Sep 17 00:00:00 2001 From: chengduoZH Date: Sun, 10 Jun 2018 19:17:50 +0800 Subject: [PATCH] ADD CPU_NUM --- paddle/fluid/framework/details/all_reduce_op_handle.cc | 2 +- python/paddle/dataset/flowers.py | 3 ++- python/paddle/fluid/data_feeder.py | 5 ++++- python/paddle/fluid/parallel_executor.py | 10 +++++++--- .../unittests/test_parallel_executor_fetch_feed.py | 3 +++ .../tests/unittests/test_parallel_executor_mnist.py | 2 ++ .../unittests/test_parallel_executor_seresnext.py | 3 +++ .../test_parallel_executor_test_while_train.py | 2 ++ .../unittests/test_parallel_executor_transformer.py | 2 ++ python/paddle/v2/dataset/flowers.py | 3 ++- 10 files changed, 28 insertions(+), 7 deletions(-) diff --git a/paddle/fluid/framework/details/all_reduce_op_handle.cc b/paddle/fluid/framework/details/all_reduce_op_handle.cc index aafb5bf07..74801e2a0 100644 --- a/paddle/fluid/framework/details/all_reduce_op_handle.cc +++ b/paddle/fluid/framework/details/all_reduce_op_handle.cc @@ -131,7 +131,7 @@ void AllReduceOpHandle::RunImpl() { } } -std::string AllReduceOpHandle::Name() const { return "nccl_all_reduce"; } +std::string AllReduceOpHandle::Name() const { return "all_reduce"; } } // namespace details } // namespace framework } // namespace paddle diff --git a/python/paddle/dataset/flowers.py b/python/paddle/dataset/flowers.py index 0d1eaeed0..527044b41 100644 --- a/python/paddle/dataset/flowers.py +++ b/python/paddle/dataset/flowers.py @@ -119,7 +119,8 @@ def reader_creator(data_file, yield sample, int(label) - 1 if use_xmap: - return xmap_readers(mapper, reader, min(4, cpu_count()), buffered_size) + cpu_num = int(os.environ.get('CPU_NUM', cpu_count())) + return xmap_readers(mapper, reader, cpu_num, buffered_size) else: return map_readers(mapper, reader) diff --git a/python/paddle/fluid/data_feeder.py b/python/paddle/fluid/data_feeder.py index 0fd696510..e2013137b 100644 --- a/python/paddle/fluid/data_feeder.py +++ b/python/paddle/fluid/data_feeder.py @@ -15,6 +15,7 @@ from __future__ import print_function import core import numpy +import os import six.moves as six import multiprocessing @@ -150,7 +151,9 @@ class DataFeeder(object): elif isinstance(self.place, core.CUDAPlace): return core.get_cuda_device_count() else: - return min(4, multiprocessing.cpu_count()) + cpu_num = int( + os.environ.get('CPU_NUM', multiprocessing.cpu_count())) + return cpu_num def decorate_reader(self, reader, diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index 70437399d..d5445f0e1 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -18,6 +18,7 @@ import framework import executor import warnings import sys +import os __all__ = ['ParallelExecutor', 'ExecutionStrategy', 'BuildStrategy'] @@ -101,7 +102,9 @@ class ParallelExecutor(object): p.set_place(self._act_places[-1]) self._places.append(p) else: - for i in xrange(min(4, multiprocessing.cpu_count())): + cpu_num = int( + os.environ.get('CPU_NUM', multiprocessing.cpu_count())) + for i in xrange(cpu_num): p = core.Place() self._act_places.append(core.CPUPlace()) p.set_place(self._act_places[-1]) @@ -118,8 +121,9 @@ class ParallelExecutor(object): # performance. Worth tunning for other models in the future. exec_strategy.num_threads = len(self._places) * 2 else: - exec_strategy.num_threads = min( - len(self._places) * 2, multiprocessing.cpu_count()) + cpu_num = int( + os.environ.get('CPU_NUM', multiprocessing.cpu_count())) + exec_strategy.num_threads = min(len(self._places) * 2, cpu_num) if build_strategy is None: build_strategy = BuildStrategy() diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py index 5c26cd489..3814e035c 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py @@ -18,6 +18,7 @@ import paddle.fluid as fluid import unittest import numpy as np import paddle +import os def Lenet(data, class_dim): @@ -89,6 +90,7 @@ class TestFetchOp(unittest.TestCase): train_inputs.append(tst_reader_iter.next()) self.parallel_exe(train_inputs, seed=1, use_cuda=True) + os.environ['CPU_NUM'] = str(4) self.parallel_exe(train_inputs, seed=1, use_cuda=False) @@ -133,6 +135,7 @@ class TestFeedParallel(unittest.TestCase): def test_feed_op(self): self.parallel_exe(use_cuda=True, seed=1) + os.environ['CPU_NUM'] = str(4) self.parallel_exe(use_cuda=False, seed=1) diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py index 3bc846d12..9b11a6991 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py @@ -18,6 +18,7 @@ import numpy as np import paddle import paddle.dataset.mnist as mnist import unittest +import os MNIST_RECORDIO_FILE = "./mnist_test_pe.recordio" @@ -85,6 +86,7 @@ def fc_with_batchnorm(use_feed): class TestMNIST(TestParallelExecutorBase): @classmethod def setUpClass(cls): + os.environ['CPU_NUM'] = str(4) # Convert mnist to recordio file with fluid.program_guard(fluid.Program(), fluid.Program()): reader = paddle.batch(mnist.train(), batch_size=4) diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py index ef6f3b99b..e3e497c89 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py @@ -15,6 +15,7 @@ import paddle.fluid as fluid from parallel_executor_test_base import TestParallelExecutorBase import unittest +import os def squeeze_excitation(input, num_channels, reduction_ratio): @@ -145,10 +146,12 @@ class TestResnet(TestParallelExecutorBase): ) def test_resnet(self): + # os.environ['CPU_NUM'] = str(4) self.check_resnet_convergence(False, use_cuda=True) # self.check_resnet_convergence(False,use_cuda=False) def test_resnet_with_new_strategy(self): + os.environ['CPU_NUM'] = str(4) self.check_resnet_convergence(True, use_cuda=True) self.check_resnet_convergence(True, use_cuda=False) diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_test_while_train.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_test_while_train.py index 1f2555d97..31ba8c1d6 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_test_while_train.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_test_while_train.py @@ -15,6 +15,7 @@ import paddle.fluid as fluid import numpy as np import unittest +import os def simple_fc_net(): @@ -36,6 +37,7 @@ def simple_fc_net(): class ParallelExecutorTestingDuringTraining(unittest.TestCase): def check_network_convergence(self, use_cuda, build_strategy=None): + os.environ['CPU_NUM'] = str(4) main = fluid.Program() startup = fluid.Program() with fluid.program_guard(main, startup): diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_transformer.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_transformer.py index 3e2c37191..b6215fddb 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_transformer.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_transformer.py @@ -19,6 +19,7 @@ from parallel_executor_test_base import TestParallelExecutorBase import unittest import paddle import paddle.dataset.wmt16 as wmt16 +import os WMT16_RECORDIO_FILE = "./wmt16_test_pe.recordio" @@ -149,6 +150,7 @@ def transformer(use_feed): class TestTransformer(TestParallelExecutorBase): @classmethod def setUpClass(cls): + os.environ['CPU_NUM'] = str(4) reader = paddle.batch( wmt16.train(ModelHyperParams.src_vocab_size, ModelHyperParams.trg_vocab_size), diff --git a/python/paddle/v2/dataset/flowers.py b/python/paddle/v2/dataset/flowers.py index 6c2c0d5cc..357a4e9b0 100644 --- a/python/paddle/v2/dataset/flowers.py +++ b/python/paddle/v2/dataset/flowers.py @@ -119,7 +119,8 @@ def reader_creator(data_file, yield sample, int(label) - 1 if use_xmap: - return xmap_readers(mapper, reader, min(4, cpu_count()), buffered_size) + cpu_num = int(os.environ.get('CPU_NUM', cpu_count())) + return xmap_readers(mapper, reader, cpu_num, buffered_size) else: return map_readers(mapper, reader) -- GitLab