提交 7b723839 编写于 作者: C chengduoZH

Add cpu test for parallel_executor_crf executor_fetch_feed, and enable these tests

上级 d24e046c
......@@ -107,18 +107,18 @@ void AllReduceOpHandle::RunImpl() {
auto &trg = *this->local_scopes_[0]
->FindVar(kLocalExecScopeName)
->Get<Scope *>()
->FindVar(in_var_handles[0]->name_)
->FindVar(out_var_handles[0]->name_)
->GetMutable<framework::LoDTensor>();
// Reduce All Tensor to trg in CPU
ReduceLoDTensor func(lod_tensors, &trg);
VisitDataType(ToDataType(lod_tensors[0]->type()), func);
for (size_t i = 0; i < local_scopes_.size(); ++i) {
for (size_t i = 1; i < local_scopes_.size(); ++i) {
auto &scope =
*local_scopes_[i]->FindVar(kLocalExecScopeName)->Get<Scope *>();
auto &p = places_[i];
auto *var = scope.FindVar(in_var_handles[i]->name_);
auto *var = scope.FindVar(out_var_handles[i]->name_);
auto *dev_ctx = dev_ctxes_[p];
RunAndRecordEvent(p, [&trg, var, dev_ctx, p] {
......
......@@ -37,7 +37,9 @@ struct ReduceLoDTensor {
PADDLE_ENFORCE_NE(t0.numel(), 0);
dst_tensor_.Resize(t0.dims());
T *dst = dst_tensor_.mutable_data<T>(platform::CPUPlace());
if (dst != t0.data<T>()) {
std::copy(t0.data<T>(), t0.data<T>() + t0.numel(), dst);
}
for (size_t i = 1; i < src_tensors_.size(); ++i) {
auto &t = *src_tensors_[i];
......
......@@ -41,8 +41,8 @@ function(py_test_modules TARGET_NAME)
endfunction()
list(REMOVE_ITEM TEST_OPS test_warpctc_op)
list(REMOVE_ITEM TEST_OPS test_dist_train)
list(REMOVE_ITEM TEST_OPS test_parallel_executor_crf)
list(REMOVE_ITEM TEST_OPS test_parallel_executor_fetch_feed)
#list(REMOVE_ITEM TEST_OPS test_parallel_executor_crf)
#list(REMOVE_ITEM TEST_OPS test_parallel_executor_fetch_feed)
# TODO(wuyi): this test hungs on CI, will add it back later
list(REMOVE_ITEM TEST_OPS test_listen_and_serv_op)
foreach(TEST_OP ${TEST_OPS})
......
......@@ -17,6 +17,7 @@ import paddle.fluid as fluid
import unittest
import paddle
import numpy as np
import os
word_dict, verb_dict, label_dict = conll05.get_dict()
word_dict_len = len(word_dict)
......@@ -101,7 +102,11 @@ def db_lstm(word, predicate, ctx_n2, ctx_n1, ctx_0, ctx_p1, ctx_p2, mark,
class TestCRFModel(unittest.TestCase):
def check_network_convergence(self, is_sparse, build_strategy=None):
def check_network_convergence(self,
is_sparse,
build_strategy=None,
use_cuda=True):
os.environ['CPU_NUM'] = str(4)
main = fluid.Program()
startup = fluid.Program()
with fluid.program_guard(main, startup):
......@@ -145,12 +150,12 @@ class TestCRFModel(unittest.TestCase):
paddle.dataset.conll05.test(), buf_size=8192),
batch_size=16)
place = fluid.CUDAPlace(0)
place = fluid.CUDAPlace(0) if use_cuda else fluid.CPUPlace()
exe = fluid.Executor(place)
exe.run(startup)
pe = fluid.ParallelExecutor(
use_cuda=True,
use_cuda=use_cuda,
loss_name=avg_cost.name,
build_strategy=build_strategy)
......@@ -172,25 +177,33 @@ class TestCRFModel(unittest.TestCase):
build_strategy = fluid.BuildStrategy()
build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce
self.check_network_convergence(
is_sparse=True, build_strategy=build_strategy)
is_sparse=True, build_strategy=build_strategy, use_cuda=True)
self.check_network_convergence(
is_sparse=True, build_strategy=build_strategy, use_cuda=False)
def test_update_dense_parameter_all_reduce(self):
build_strategy = fluid.BuildStrategy()
build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.AllReduce
self.check_network_convergence(
is_sparse=False, build_strategy=build_strategy)
is_sparse=False, build_strategy=build_strategy, use_cuda=True)
self.check_network_convergence(
is_sparse=False, build_strategy=build_strategy, use_cuda=False)
def test_update_sparse_parameter_reduce(self):
build_strategy = fluid.BuildStrategy()
build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
self.check_network_convergence(
is_sparse=True, build_strategy=build_strategy)
is_sparse=True, build_strategy=build_strategy, use_cuda=True)
self.check_network_convergence(
is_sparse=True, build_strategy=build_strategy, use_cuda=False)
def test_update_dense_parameter_reduce(self):
build_strategy = fluid.BuildStrategy()
build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce
self.check_network_convergence(
is_sparse=False, build_strategy=build_strategy)
is_sparse=False, build_strategy=build_strategy, use_cuda=True)
self.check_network_convergence(
is_sparse=False, build_strategy=build_strategy, use_cuda=False)
if __name__ == '__main__':
......
......@@ -89,8 +89,8 @@ class TestFetchOp(unittest.TestCase):
for i in range(iters):
train_inputs.append(tst_reader_iter.next())
self.parallel_exe(train_inputs, seed=1, use_cuda=True)
os.environ['CPU_NUM'] = str(4)
self.parallel_exe(train_inputs, seed=1, use_cuda=True)
self.parallel_exe(train_inputs, seed=1, use_cuda=False)
......@@ -134,8 +134,8 @@ class TestFeedParallel(unittest.TestCase):
break
def test_feed_op(self):
self.parallel_exe(use_cuda=True, seed=1)
os.environ['CPU_NUM'] = str(4)
self.parallel_exe(use_cuda=True, seed=1)
self.parallel_exe(use_cuda=False, seed=1)
......
......@@ -135,6 +135,8 @@ class TestResnet(TestParallelExecutorBase):
balance_parameter_opt_between_cards,
use_cuda=True,
iter=20):
os.environ['CPU_NUM'] = str(4)
import functools
batch_size = 2
self.check_network_convergence(
......@@ -147,12 +149,10 @@ class TestResnet(TestParallelExecutorBase):
)
def test_resnet(self):
os.environ['CPU_NUM'] = str(4)
self.check_resnet_convergence(False, use_cuda=True)
self.check_resnet_convergence(False, use_cuda=False, iter=5)
def test_resnet_with_new_strategy(self):
os.environ['CPU_NUM'] = str(4)
self.check_resnet_convergence(True, use_cuda=True)
self.check_resnet_convergence(True, use_cuda=False, iter=5)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册