diff --git a/paddle/fluid/framework/ir/graph_helper.cc b/paddle/fluid/framework/ir/graph_helper.cc index c54766d95a61ac1a4b61566c6de62cbc86685a1d..01e878089171e4620f32b57a65d92d1c86d307db 100644 --- a/paddle/fluid/framework/ir/graph_helper.cc +++ b/paddle/fluid/framework/ir/graph_helper.cc @@ -120,19 +120,25 @@ size_t GraphNum(const Graph &graph) { std::deque q_nodes; std::vector> graph_nodes; std::unordered_set g_nodes; + // q_set used to record records in the queue. + std::unordered_set q_set; size_t graph_count = 0; - auto traverse_nodes = [&visited_nodes, - &q_nodes](const std::vector &nodes) { - std::copy_if( - nodes.begin(), nodes.end(), std::back_inserter(q_nodes), - [&visited_nodes](Node *node) { return !visited_nodes.count(node); }); + auto traverse_nodes = [&visited_nodes, &q_nodes, + &q_set](const std::vector &nodes) { + for (auto n : nodes) { + if (visited_nodes.count(n) == 0 && q_set.count(n) == 0) { + q_nodes.push_back(n); + q_set.insert(n); + } + } }; while (visited_nodes.size() != nodes.size()) { if (!q_nodes.empty()) { auto cur_node = q_nodes.front(); q_nodes.pop_front(); + q_set.erase(cur_node); visited_nodes.insert(cur_node); g_nodes.insert(cur_node); traverse_nodes(cur_node->inputs); @@ -146,6 +152,7 @@ size_t GraphNum(const Graph &graph) { for (auto &n : nodes) { if (visited_nodes.count(n) == 0) { q_nodes.push_back(n); + q_set.insert(n); break; } } diff --git a/paddle/fluid/framework/op_proto_maker.h b/paddle/fluid/framework/op_proto_maker.h index 5527783faad6f7ea6e0b9e776cbde3c743277f16..678c14a44b5c259ddc6f914b3fbb5b50e649993c 100644 --- a/paddle/fluid/framework/op_proto_maker.h +++ b/paddle/fluid/framework/op_proto_maker.h @@ -28,12 +28,12 @@ enum class OpRole { kBackward = 0x0001, kOptimize = 0x0002, // RPC role is for send/recv releated op - kRPC = 0x0003, + kRPC = 0x0004, // Dist role is for split_byref/split_selected_rows/concat // used for distributed training. - kDist = 0x0004, + kDist = 0x0008, // Tag all learning rate scheduler operators. - kLRSched = 0x0005, + kLRSched = 0x0016, kLoss = 0x0100, // The default value of op's role. This should be only used for unittests and diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 7dad872dd04754d2866c361b74ab1236ff743da5..3368ae2ee4cf65b85abf1fcd89dee14f43522e1f 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -156,6 +156,12 @@ ParallelExecutor::ParallelExecutor( params, member_->local_scopes_, member_->use_cuda_); #endif + // If the loss_var_name is given, the number of graph should be only one. + if (loss_var_name.size()) { + PADDLE_ENFORCE_EQ(ir::GraphNum(*graph), 1, + "The number of graph should be only one"); + } + if (exec_strategy.type_ == ExecutionStrategy::kDefault) { member_->executor_.reset(new details::ThreadedSSAGraphExecutor( exec_strategy, member_->local_scopes_, places, std::move(graph))); diff --git a/python/paddle/fluid/tests/unittests/test_dist_ctr.py b/python/paddle/fluid/tests/unittests/test_dist_ctr.py index 3575fd07fc727bd6c6b07a19a60b1df6656ae9e2..390393e04f8a1ff7b994da66cf1fa104ccb61793 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_ctr.py +++ b/python/paddle/fluid/tests/unittests/test_dist_ctr.py @@ -23,9 +23,8 @@ class TestDistCTR2x2(TestDistBase): self._sync_mode = True self._enforce_place = "CPU" - -def test_dist_ctr(self): - self.check_with_place("dist_ctr.py", delta=1e-7, check_error_log=False) + def test_dist_ctr(self): + self.check_with_place("dist_ctr.py", delta=1e-7, check_error_log=False) if __name__ == "__main__": diff --git a/python/paddle/fluid/tests/unittests/test_dist_mnist.py b/python/paddle/fluid/tests/unittests/test_dist_mnist.py index 94b66a40233be4378e1a003f01d9375d00794743..f65dd7e2a28c4ace3988c0cc1267ebe981fbd9cb 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_dist_mnist.py @@ -40,8 +40,7 @@ class TestDistMnistAsync(TestDistBase): self._sync_mode = False self._use_reduce = False - # FIXME(typhoonzero): fix async mode test later - def no_test_dist_train(self): + def test_dist_train(self): self.check_with_place("dist_mnist.py", delta=200) diff --git a/python/paddle/fluid/tests/unittests/test_dist_se_resnext.py b/python/paddle/fluid/tests/unittests/test_dist_se_resnext.py index c1e60dc9e420d11677468e0c62357437ecdf9e35..c0989ca709e100d8f147a08970b0e858c81ce09b 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_se_resnext.py +++ b/python/paddle/fluid/tests/unittests/test_dist_se_resnext.py @@ -40,8 +40,7 @@ class TestDistSeResneXt2x2Async(TestDistBase): self._sync_mode = False self._use_reader_alloc = False - #FIXME(typhoonzero): fix async mode later - def no_test_dist_train(self): + def test_dist_train(self): self.check_with_place("dist_se_resnext.py", delta=100) diff --git a/python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py b/python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py index e1e6ef61090dfb439a3b43c4baf5ba88f61310ba..fcf793da07eb8985fb19c9e36ff9b7d5a8f51212 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py +++ b/python/paddle/fluid/tests/unittests/test_dist_simnet_bow.py @@ -79,8 +79,7 @@ class TestDistSimnetBow2x2SparseAsync(TestDistBase): self._sync_mode = False self._enforce_place = "CPU" - #FIXME(typhoonzero): fix async tests later - def no_test_simnet_bow(self): + def test_simnet_bow(self): need_envs = { "IS_DISTRIBUTED": '0', "IS_SPARSE": '1',