From 203027ca860368385ae545149694ae565c381f52 Mon Sep 17 00:00:00 2001 From: sneaxiy Date: Fri, 2 Nov 2018 08:22:02 +0000 Subject: [PATCH] test=develop --- .../fluid/framework/details/build_strategy.h | 2 +- .../details/sequential_execution_pass.cc | 14 ++++++- .../unittests/parallel_executor_test_base.py | 4 +- .../test_parallel_executor_seresnext.py | 40 +++++++++++++++++++ .../test_parallel_executor_transformer.py | 2 + 5 files changed, 59 insertions(+), 3 deletions(-) diff --git a/paddle/fluid/framework/details/build_strategy.h b/paddle/fluid/framework/details/build_strategy.h index 3f0a7cb1f..88459320b 100644 --- a/paddle/fluid/framework/details/build_strategy.h +++ b/paddle/fluid/framework/details/build_strategy.h @@ -69,7 +69,7 @@ struct BuildStrategy { bool enable_data_balance_{false}; - bool enable_sequential_execution_{true}; + bool enable_sequential_execution_{false}; bool fuse_broadcast_op_{false}; diff --git a/paddle/fluid/framework/details/sequential_execution_pass.cc b/paddle/fluid/framework/details/sequential_execution_pass.cc index 649bdb098..cc2c8bfef 100644 --- a/paddle/fluid/framework/details/sequential_execution_pass.cc +++ b/paddle/fluid/framework/details/sequential_execution_pass.cc @@ -13,6 +13,7 @@ // limitations under the License. #include "paddle/fluid/framework/details/sequential_execution_pass.h" +#include #include #include #include @@ -29,6 +30,15 @@ static bool IsSameOpDesc(OpDesc *op1, OpDesc *op2) { std::unique_ptr SequentialExecutionPass::ApplyImpl( std::unique_ptr graph) const { + // FIXME(zjl): Insert dependencies between some distributed ops may cause + // the multi_devices_graph_pass fails. So we skip these ops here. + // Indeed, maybe we should not insert dependencies between these ops + // casually, which may cause deadlock easily. + // We should add more skipped distributed ops when found errors in + // multi_devices_graph_pass + static std::unordered_set skip_dist_ops{ + "send", "recv", "send_barrier", "fetch_barrier"}; + auto &ops = Get>(kAllOpDescs); std::vector op_node_list; op_node_list.reserve(ops.size()); @@ -73,7 +83,9 @@ std::unique_ptr SequentialExecutionPass::ApplyImpl( } } ready_ops.erase(found_node); - op_node_list.push_back(found_node); + if (skip_dist_ops.count(op_desc->Type()) == 0) { + op_node_list.push_back(found_node); + } } for (size_t i = 1; i < op_node_list.size(); ++i) { diff --git a/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py b/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py index ee291fe74..a3fe5e0a0 100644 --- a/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py +++ b/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py @@ -40,7 +40,8 @@ class TestParallelExecutorBase(unittest.TestCase): use_reduce=False, fuse_elewise_add_act_ops=False, optimizer=fluid.optimizer.Adam, - use_fast_executor=False): + use_fast_executor=False, + enable_sequential_execution=False): def run_executor(exe, feed, fetch_list, program=None): if isinstance(exe, fluid.ParallelExecutor): res = exe.run(fetch_list=fetch_list, feed=feed) @@ -80,6 +81,7 @@ class TestParallelExecutorBase(unittest.TestCase): build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce \ if use_reduce else fluid.BuildStrategy.ReduceStrategy.AllReduce build_strategy.fuse_elewise_add_act_ops = fuse_elewise_add_act_ops + build_strategy.enable_sequential_execution = enable_sequential_execution if use_parallel_executor: exe = fluid.ParallelExecutor( diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py index cc2d692e1..e7a56bb63 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_seresnext.py @@ -232,6 +232,46 @@ class TestResnet(TestParallelExecutorBase): for loss in zip(all_reduce_last_loss, reduce_last_loss): self.assertAlmostEquals(loss[0], loss[1], delta=delta2) + if not use_cuda: + return + + all_reduce_first_loss_seq, all_reduce_last_loss_seq = self.check_network_convergence( + model, + feed_dict={"image": img, + "label": label}, + iter=iter, + batch_size=batch_size, + use_cuda=use_cuda, + use_reduce=False, + optimizer=optimizer, + enable_sequential_execution=True) + + reduce_first_loss_seq, reduce_last_loss_seq = self.check_network_convergence( + model, + feed_dict={"image": img, + "label": label}, + iter=iter, + batch_size=batch_size, + use_cuda=use_cuda, + use_reduce=True, + optimizer=optimizer, + enable_sequential_execution=True) + + for loss in zip(all_reduce_first_loss, all_reduce_first_loss_seq): + self.assertAlmostEquals(loss[0], loss[1], delta=1e-6) + for loss in zip(all_reduce_last_loss, all_reduce_last_loss_seq): + self.assertAlmostEquals(loss[0], loss[1], delta=delta2) + + for loss in zip(reduce_first_loss, reduce_first_loss_seq): + self.assertAlmostEquals(loss[0], loss[1], delta=1e-6) + for loss in zip(reduce_last_loss, reduce_last_loss_seq): + self.assertAlmostEquals(loss[0], loss[1], delta=delta2) + + for loss in zip(all_reduce_first_loss_seq, reduce_first_loss_seq): + self.assertAlmostEquals(loss[0], loss[1], delta=1e-6) + for loss in zip(all_reduce_last_loss_seq, reduce_last_loss_seq): + self.assertAlmostEquals(loss[0], loss[1], delta=delta2) + def _check_resnet_convergence(self, model, use_cuda=True, diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_transformer.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_transformer.py index a55b2002e..382774390 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_transformer.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_transformer.py @@ -173,6 +173,8 @@ class TestTransformer(TestParallelExecutorBase): def test_main(self): if core.is_compiled_with_cuda(): self.check_network_convergence(transformer, use_cuda=True) + self.check_network_convergence( + transformer, use_cuda=True, enable_sequential_execution=True) self.check_network_convergence(transformer, use_cuda=False, iter=5) -- GitLab