From 7b723839ef9c48d4b593ae8c51874e1a1ea95225 Mon Sep 17 00:00:00 2001 From: chengduoZH Date: Mon, 11 Jun 2018 13:30:14 +0800 Subject: [PATCH] Add cpu test for parallel_executor_crf executor_fetch_feed, and enable these tests --- .../framework/details/all_reduce_op_handle.cc | 6 ++--- .../framework/details/reduce_and_gather.h | 4 ++- .../fluid/tests/unittests/CMakeLists.txt | 4 +-- .../unittests/test_parallel_executor_crf.py | 27 ++++++++++++++----- .../test_parallel_executor_fetch_feed.py | 4 +-- .../test_parallel_executor_seresnext.py | 4 +-- 6 files changed, 32 insertions(+), 17 deletions(-) diff --git a/paddle/fluid/framework/details/all_reduce_op_handle.cc b/paddle/fluid/framework/details/all_reduce_op_handle.cc index 535bcd46e82..b335d3a0d36 100644 --- a/paddle/fluid/framework/details/all_reduce_op_handle.cc +++ b/paddle/fluid/framework/details/all_reduce_op_handle.cc @@ -107,18 +107,18 @@ void AllReduceOpHandle::RunImpl() { auto &trg = *this->local_scopes_[0] ->FindVar(kLocalExecScopeName) ->Get() - ->FindVar(in_var_handles[0]->name_) + ->FindVar(out_var_handles[0]->name_) ->GetMutable(); // Reduce All Tensor to trg in CPU ReduceLoDTensor func(lod_tensors, &trg); VisitDataType(ToDataType(lod_tensors[0]->type()), func); - for (size_t i = 0; i < local_scopes_.size(); ++i) { + for (size_t i = 1; i < local_scopes_.size(); ++i) { auto &scope = *local_scopes_[i]->FindVar(kLocalExecScopeName)->Get(); auto &p = places_[i]; - auto *var = scope.FindVar(in_var_handles[i]->name_); + auto *var = scope.FindVar(out_var_handles[i]->name_); auto *dev_ctx = dev_ctxes_[p]; RunAndRecordEvent(p, [&trg, var, dev_ctx, p] { diff --git a/paddle/fluid/framework/details/reduce_and_gather.h b/paddle/fluid/framework/details/reduce_and_gather.h index 2b95a284990..a6ffb37313a 100644 --- a/paddle/fluid/framework/details/reduce_and_gather.h +++ b/paddle/fluid/framework/details/reduce_and_gather.h @@ -37,7 +37,9 @@ struct ReduceLoDTensor { PADDLE_ENFORCE_NE(t0.numel(), 0); dst_tensor_.Resize(t0.dims()); T *dst = dst_tensor_.mutable_data(platform::CPUPlace()); - std::copy(t0.data(), t0.data() + t0.numel(), dst); + if (dst != t0.data()) { + std::copy(t0.data(), t0.data() + t0.numel(), dst); + } for (size_t i = 1; i < src_tensors_.size(); ++i) { auto &t = *src_tensors_[i]; diff --git a/python/paddle/fluid/tests/unittests/CMakeLists.txt b/python/paddle/fluid/tests/unittests/CMakeLists.txt index 673bd728718..ab683bc1017 100644 --- a/python/paddle/fluid/tests/unittests/CMakeLists.txt +++ b/python/paddle/fluid/tests/unittests/CMakeLists.txt @@ -41,8 +41,8 @@ function(py_test_modules TARGET_NAME) endfunction() list(REMOVE_ITEM TEST_OPS test_warpctc_op) list(REMOVE_ITEM TEST_OPS test_dist_train) -list(REMOVE_ITEM TEST_OPS test_parallel_executor_crf) -list(REMOVE_ITEM TEST_OPS test_parallel_executor_fetch_feed) +#list(REMOVE_ITEM TEST_OPS test_parallel_executor_crf) +#list(REMOVE_ITEM TEST_OPS test_parallel_executor_fetch_feed) # TODO(wuyi): this test hungs on CI, will add it back later list(REMOVE_ITEM TEST_OPS test_listen_and_serv_op) foreach(TEST_OP ${TEST_OPS}) diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_crf.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_crf.py index 66e138b03f3..163975555ec 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_crf.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_crf.py @@ -17,6 +17,7 @@ import paddle.fluid as fluid import unittest import paddle import numpy as np +import os word_dict, verb_dict, label_dict = conll05.get_dict() word_dict_len = len(word_dict) @@ -101,7 +102,11 @@ def db_lstm(word, predicate, ctx_n2, ctx_n1, ctx_0, ctx_p1, ctx_p2, mark, class TestCRFModel(unittest.TestCase): - def check_network_convergence(self, is_sparse, build_strategy=None): + def check_network_convergence(self, + is_sparse, + build_strategy=None, + use_cuda=True): + os.environ['CPU_NUM'] = str(4) main = fluid.Program() startup = fluid.Program() with fluid.program_guard(main, startup): @@ -145,12 +150,12 @@ class TestCRFModel(unittest.TestCase): paddle.dataset.conll05.test(), buf_size=8192), batch_size=16) - place = fluid.CUDAPlace(0) + place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace() exe = fluid.Executor(place) exe.run(startup) pe = fluid.ParallelExecutor( - use_cuda=True, + use_cuda=use_cuda, loss_name=avg_cost.name, build_strategy=build_strategy) @@ -172,25 +177,33 @@ class TestCRFModel(unittest.TestCase): build_strategy = fluid.BuildStrategy() build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce self.check_network_convergence( - is_sparse=True, build_strategy=build_strategy) + is_sparse=True, build_strategy=build_strategy, use_cuda=True) + self.check_network_convergence( + is_sparse=True, build_strategy=build_strategy, use_cuda=False) def test_update_dense_parameter_all_reduce(self): build_strategy = fluid.BuildStrategy() build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce self.check_network_convergence( - is_sparse=False, build_strategy=build_strategy) + is_sparse=False, build_strategy=build_strategy, use_cuda=True) + self.check_network_convergence( + is_sparse=False, build_strategy=build_strategy, use_cuda=False) def test_update_sparse_parameter_reduce(self): build_strategy = fluid.BuildStrategy() build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce self.check_network_convergence( - is_sparse=True, build_strategy=build_strategy) + is_sparse=True, build_strategy=build_strategy, use_cuda=True) + self.check_network_convergence( + is_sparse=True, build_strategy=build_strategy, use_cuda=False) def test_update_dense_parameter_reduce(self): build_strategy = fluid.BuildStrategy() build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce self.check_network_convergence( - is_sparse=False, build_strategy=build_strategy) + is_sparse=False, build_strategy=build_strategy, use_cuda=True) + self.check_network_convergence( + is_sparse=False, build_strategy=build_strategy, use_cuda=False) if __name__ == '__main__': diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py index 3814e035cfc..79702475cca 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_fetch_feed.py @@ -89,8 +89,8 @@ class TestFetchOp(unittest.TestCase): for i in range(iters): train_inputs.append(tst_reader_iter.next()) - self.parallel_exe(train_inputs, seed=1, use_cuda=True) os.environ['CPU_NUM'] = str(4) + self.parallel_exe(train_inputs, seed=1, use_cuda=True) self.parallel_exe(train_inputs, seed=1, use_cuda=False) @@ -134,8 +134,8 @@ class TestFeedParallel(unittest.TestCase): break def test_feed_op(self): - self.parallel_exe(use_cuda=True, seed=1) os.environ['CPU_NUM'] = str(4) + self.parallel_exe(use_cuda=True, seed=1) self.parallel_exe(use_cuda=False, seed=1) diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py index d178b77bed8..066299e6c6f 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py @@ -135,6 +135,8 @@ class TestResnet(TestParallelExecutorBase): balance_parameter_opt_between_cards, use_cuda=True, iter=20): + os.environ['CPU_NUM'] = str(4) + import functools batch_size = 2 self.check_network_convergence( @@ -147,12 +149,10 @@ class TestResnet(TestParallelExecutorBase): ) def test_resnet(self): - os.environ['CPU_NUM'] = str(4) self.check_resnet_convergence(False, use_cuda=True) self.check_resnet_convergence(False, use_cuda=False, iter=5) def test_resnet_with_new_strategy(self): - os.environ['CPU_NUM'] = str(4) self.check_resnet_convergence(True, use_cuda=True) self.check_resnet_convergence(True, use_cuda=False, iter=5) -- GitLab