From 9da9b1926b2bf05467c11a914820b2dd1a9250e6 Mon Sep 17 00:00:00 2001 From: Wu Yi Date: Sun, 28 Oct 2018 11:39:05 +0800 Subject: [PATCH] [1.1] fix graph num hang (#14072) * fix graph num hang test=develop * re-enable tests test=develop * re-enable graph num check test=develop * fix multi device pass role check test=develop --- paddle/fluid/framework/ir/graph_helper.cc | 17 ++++++++++++----- paddle/fluid/framework/op_proto_maker.h | 6 +++--- paddle/fluid/framework/parallel_executor.cc | 6 ++++++ .../fluid/tests/unittests/test_dist_ctr.py | 5 ++--- .../fluid/tests/unittests/test_dist_mnist.py | 3 +-- .../tests/unittests/test_dist_se_resnext.py | 3 +-- .../tests/unittests/test_dist_simnet_bow.py | 3 +-- 7 files changed, 26 insertions(+), 17 deletions(-) diff --git a/paddle/fluid/framework/ir/graph_helper.cc b/paddle/fluid/framework/ir/graph_helper.cc index c54766d95a6..01e87808917 100644 --- a/paddle/fluid/framework/ir/graph_helper.cc +++ b/paddle/fluid/framework/ir/graph_helper.cc @@ -120,19 +120,25 @@ size_t GraphNum(const Graph &graph) { std::deque q_nodes; std::vector> graph_nodes; std::unordered_set g_nodes; + // q_set used to record records in the queue. + std::unordered_set q_set; size_t graph_count = 0; - auto traverse_nodes = [&visited_nodes, - &q_nodes](const std::vector &nodes) { - std::copy_if( - nodes.begin(), nodes.end(), std::back_inserter(q_nodes), - [&visited_nodes](Node *node) { return !visited_nodes.count(node); }); + auto traverse_nodes = [&visited_nodes, &q_nodes, + &q_set](const std::vector &nodes) { + for (auto n : nodes) { + if (visited_nodes.count(n) == 0 && q_set.count(n) == 0) { + q_nodes.push_back(n); + q_set.insert(n); + } + } }; while (visited_nodes.size() != nodes.size()) { if (!q_nodes.empty()) { auto cur_node = q_nodes.front(); q_nodes.pop_front(); + q_set.erase(cur_node); visited_nodes.insert(cur_node); g_nodes.insert(cur_node); traverse_nodes(cur_node->inputs); @@ -146,6 +152,7 @@ size_t GraphNum(const Graph &graph) { for (auto &n : nodes) { if (visited_nodes.count(n) == 0) { q_nodes.push_back(n); + q_set.insert(n); break; } } diff --git a/paddle/fluid/framework/op_proto_maker.h b/paddle/fluid/framework/op_proto_maker.h index 5527783faad..678c14a44b5 100644 --- a/paddle/fluid/framework/op_proto_maker.h +++ b/paddle/fluid/framework/op_proto_maker.h @@ -28,12 +28,12 @@ enum class OpRole { kBackward = 0x0001, kOptimize = 0x0002, // RPC role is for send/recv releated op - kRPC = 0x0003, + kRPC = 0x0004, // Dist role is for split_byref/split_selected_rows/concat // used for distributed training. - kDist = 0x0004, + kDist = 0x0008, // Tag all learning rate scheduler operators. - kLRSched = 0x0005, + kLRSched = 0x0016, kLoss = 0x0100, // The default value of op's role. This should be only used for unittests and diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 7dad872dd04..3368ae2ee4c 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -156,6 +156,12 @@ ParallelExecutor::ParallelExecutor( params, member_->local_scopes_, member_->use_cuda_); #endif + // If the loss_var_name is given, the number of graph should be only one. + if (loss_var_name.size()) { + PADDLE_ENFORCE_EQ(ir::GraphNum(*graph), 1, + "The number of graph should be only one"); + } + if (exec_strategy.type_ == ExecutionStrategy::kDefault) { member_->executor_.reset(new details::ThreadedSSAGraphExecutor( exec_strategy, member_->local_scopes_, places, std::move(graph))); diff --git a/python/paddle/fluid/tests/unittests/test_dist_ctr.py b/python/paddle/fluid/tests/unittests/test_dist_ctr.py index 3575fd07fc7..390393e04f8 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_ctr.py +++ b/python/paddle/fluid/tests/unittests/test_dist_ctr.py @@ -23,9 +23,8 @@ class TestDistCTR2x2(TestDistBase): self._sync_mode = True self._enforce_place = "CPU" - -def test_dist_ctr(self): - self.check_with_place("dist_ctr.py", delta=1e-7, check_error_log=False) + def test_dist_ctr(self): + self.check_with_place("dist_ctr.py", delta=1e-7, check_error_log=False) if __name__ == "__main__": diff --git a/python/paddle/fluid/tests/unittests/test_dist_mnist.py b/python/paddle/fluid/tests/unittests/test_dist_mnist.py index 94b66a40233..f65dd7e2a28 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_dist_mnist.py @@ -40,8 +40,7 @@ class TestDistMnistAsync(TestDistBase): self._sync_mode = False self._use_reduce = False - # FIXME(typhoonzero): fix async mode test later - def no_test_dist_train(self): + def test_dist_train(self): self.check_with_place("dist_mnist.py", delta=200) diff --git a/python/paddle/fluid/tests/unittests/test_dist_se_resnext.py b/python/paddle/fluid/tests/unittests/test_dist_se_resnext.py index c1e60dc9e42..c0989ca709e 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_se_resnext.py +++ b/python/paddle/fluid/tests/unittests/test_dist_se_resnext.py @@ -40,8 +40,7 @@ class TestDistSeResneXt2x2Async(TestDistBase): self._sync_mode = False self._use_reader_alloc = False - #FIXME(typhoonzero): fix async mode later - def no_test_dist_train(self): + def test_dist_train(self): self.check_with_place("dist_se_resnext.py", delta=100) diff --git a/python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py b/python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py index e1e6ef61090..fcf793da07e 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py +++ b/python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py @@ -79,8 +79,7 @@ class TestDistSimnetBow2x2SparseAsync(TestDistBase): self._sync_mode = False self._enforce_place = "CPU" - #FIXME(typhoonzero): fix async tests later - def no_test_simnet_bow(self): + def test_simnet_bow(self): need_envs = { "IS_DISTRIBUTED": '0', "IS_SPARSE": '1', -- GitLab