diff --git a/paddle/fluid/framework/details/all_reduce_op_handle.cc b/paddle/fluid/framework/details/all_reduce_op_handle.cc index 74801e2a0d338048add64d155ca8bcb2bd404cc3..f4c21c795855df71c46bb81e92223fd3b6b01706 100644 --- a/paddle/fluid/framework/details/all_reduce_op_handle.cc +++ b/paddle/fluid/framework/details/all_reduce_op_handle.cc @@ -67,7 +67,7 @@ void AllReduceOpHandle::RunImpl() { if (platform::is_gpu_place(lod_tensors[0]->place())) { #ifdef PADDLE_WITH_CUDA - PADDLE_ENFORCE(nccl_ctxs_); + PADDLE_ENFORCE(nccl_ctxs_, "nccl_ctxs should not be nullptr."); int dtype = -1; size_t numel = 0; std::vector> all_reduce_calls; diff --git a/python/paddle/fluid/parallel_executor.py b/python/paddle/fluid/parallel_executor.py index d5445f0e17e87464bfc41635b1f565fa7a9ec64d..11e4353d44c5716fa3b877828774339ac156641a 100644 --- a/python/paddle/fluid/parallel_executor.py +++ b/python/paddle/fluid/parallel_executor.py @@ -119,11 +119,10 @@ class ParallelExecutor(object): if use_cuda: # Experiments on se-resnext shows that too many threads hurt # performance. Worth tunning for other models in the future. - exec_strategy.num_threads = len(self._places) * 2 + exec_strategy.num_threads = len(self._places) * 4 else: - cpu_num = int( - os.environ.get('CPU_NUM', multiprocessing.cpu_count())) - exec_strategy.num_threads = min(len(self._places) * 2, cpu_num) + # Currently num_threads must be 1. + exec_strategy.num_threads = 1 if build_strategy is None: build_strategy = BuildStrategy() diff --git a/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py b/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py index 566b676777cc329dce02f1875abf0d72176c1c00..829c5a1a5fd099543e9e98b9587d4f316a91b587 100644 --- a/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py +++ b/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py @@ -12,6 +12,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import multiprocessing +import os import unittest import paddle.fluid as fluid import time @@ -73,7 +75,9 @@ class TestParallelExecutorBase(unittest.TestCase): exe = fluid.Executor(place=place) if batch_size is not None: - batch_size *= fluid.core.get_cuda_device_count() + batch_size *= fluid.core.get_cuda_device_count( + ) if use_cuda else int( + os.environ.get('CPU_NUM', multiprocessing.cpu_count())) begin = time.time() first_loss, = run_executor( exe=exe, feed=feed_dict, fetch_list=[loss.name]) diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py index 9b11a69912e6f6d7deffcc67502ce4b161c44ea3..a801d99aa1ced35eb7f081fde63ad541f0eb2589 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py @@ -104,8 +104,9 @@ class TestMNIST(TestParallelExecutorBase): def check_simple_fc_convergence(self, balance_parameter_opt_between_cards, use_cuda=True): - self.check_network_convergence(simple_fc_net) - self.check_network_convergence(simple_fc_net, allow_op_delay=True) + self.check_network_convergence(simple_fc_net, use_cuda=use_cuda) + self.check_network_convergence( + simple_fc_net, use_cuda=use_cuda, allow_op_delay=True) img = np.zeros(shape=[32, 784], dtype='float32') label = np.ones(shape=[32, 1], dtype='int64') @@ -142,6 +143,7 @@ class TestMNIST(TestParallelExecutorBase): seed=1000, feed_dict={"image": img, "label": label}, + use_cuda=use_cuda, use_parallel_executor=True, balance_parameter_opt_between_cards=balance_parameter_opt_between_cards ) @@ -161,7 +163,7 @@ class TestMNIST(TestParallelExecutorBase): def check_batchnorm_fc_convergence( self, balance_parameter_opt_between_cards, use_cuda): - self.check_network_convergence(fc_with_batchnorm) + self.check_network_convergence(fc_with_batchnorm, use_cuda=use_cuda) img = np.zeros(shape=[32, 784], dtype='float32') label = np.ones(shape=[32, 1], dtype='int64') self.check_network_convergence( diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py index e3e497c899f9e6ee4c2218f3414ba2e1bc89123c..d178b77bed876f4ae57c8d170922576781ab313f 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py @@ -133,27 +133,28 @@ def SE_ResNeXt50Small(batch_size=2, use_feed=False): class TestResnet(TestParallelExecutorBase): def check_resnet_convergence(self, balance_parameter_opt_between_cards, - use_cuda=True): + use_cuda=True, + iter=20): import functools batch_size = 2 self.check_network_convergence( functools.partial( SE_ResNeXt50Small, batch_size=batch_size), - iter=20, + iter=iter, batch_size=batch_size, use_cuda=use_cuda, balance_parameter_opt_between_cards=balance_parameter_opt_between_cards ) def test_resnet(self): - # os.environ['CPU_NUM'] = str(4) + os.environ['CPU_NUM'] = str(4) self.check_resnet_convergence(False, use_cuda=True) - # self.check_resnet_convergence(False,use_cuda=False) + self.check_resnet_convergence(False, use_cuda=False, iter=5) def test_resnet_with_new_strategy(self): os.environ['CPU_NUM'] = str(4) self.check_resnet_convergence(True, use_cuda=True) - self.check_resnet_convergence(True, use_cuda=False) + self.check_resnet_convergence(True, use_cuda=False, iter=5) if __name__ == '__main__':