From a8f85f2c8fae48ff7fc3062327e31a0acb840261 Mon Sep 17 00:00:00 2001 From: tangwei12 Date: Thu, 13 Feb 2020 13:43:43 +0800 Subject: [PATCH] fix bug with compiledProgram (#22495) (#22566) * add thread barrier for the compiled program --- .../framework/details/async_ssa_graph_executor.cc | 6 ++++++ .../fluid/framework/details/execution_strategy.h | 1 + .../details/threaded_ssa_graph_executor.cc | 12 ++++++++++++ paddle/fluid/pybind/pybind.cc | 8 ++++++++ .../distribute_transpiler/distributed_strategy.py | 3 ++- .../paddle/fluid/tests/unittests/dist_fleet_ctr.py | 9 ++++++++- .../fluid/tests/unittests/test_dist_fleet_base.py | 14 ++++---------- .../fluid/tests/unittests/test_dist_fleet_ctr.py | 12 +++++++----- 8 files changed, 48 insertions(+), 17 deletions(-) diff --git a/paddle/fluid/framework/details/async_ssa_graph_executor.cc b/paddle/fluid/framework/details/async_ssa_graph_executor.cc index fc0889fdc42..5f3b10a3b61 100644 --- a/paddle/fluid/framework/details/async_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/async_ssa_graph_executor.cc @@ -168,6 +168,12 @@ FeedFetchList AsyncSSAGraphExecutor::Run( const std::vector &fetch_tensors) { // init once if (run_futures_.size() == 0 && places_.size() > 1) { + if (strategy_.thread_barrier_) { +#ifdef PADDLE_WITH_DISTRIBUTE + operators::distributed::Communicator::GetInstance()->BarrierTriggerReset( + places_.size()); +#endif + } exception_holder_.Clear(); StartOffPythonTrainLoop(); } diff --git a/paddle/fluid/framework/details/execution_strategy.h b/paddle/fluid/framework/details/execution_strategy.h index b44e6b6a75a..a6936577c57 100644 --- a/paddle/fluid/framework/details/execution_strategy.h +++ b/paddle/fluid/framework/details/execution_strategy.h @@ -36,6 +36,7 @@ struct ExecutionStrategy { ExecutorType type_{kExperimental}; // This debug option. bool dry_run_{false}; + bool thread_barrier_{false}; // only use with async_ssa_graph_executor // and pyreader with data queue diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc index 5ee47a3933b..4e1808d2e7b 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc @@ -16,6 +16,10 @@ #include "paddle/fluid/framework/ir/graph_helper.h" #include "paddle/fluid/platform/profiler.h" +#ifdef PADDLE_WITH_DISTRIBUTE +#include "paddle/fluid/operators/distributed/communicator.h" +#endif + namespace paddle { namespace framework { namespace details { @@ -332,8 +336,16 @@ bool ThreadedSSAGraphExecutor::RunOpSync(OpHandleBase *op) { void ThreadedSSAGraphExecutor::ExecutionFinal( std::vector *fetch_ops) { +#ifdef PADDLE_WITH_DISTRIBUTE + if (strategy_.thread_barrier_) { + operators::distributed::Communicator::GetInstance() + ->BarrierTriggerDecrement(); + } +#endif + VLOG(3) << "caught exception " << exception_holder_.Type() << ", rethrow it"; ClearFetchOp(graph_, fetch_ops); + exception_holder_.ReThrow(); } diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index c0ec77b2d5c..796a56975f4 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -1732,6 +1732,14 @@ All parameter, weight, gradient are variables in Paddle. R"DOC(This config that how many iteration the executor will run when user call exe.run() in python )DOC") + .def_property( + "use_thread_barrier", + [](const ExecutionStrategy &self) { return self.thread_barrier_; }, + [](ExecutionStrategy &self, bool use_thread_barrier) { + self.thread_barrier_ = use_thread_barrier; + }, + R"DOC(This config that the this is distributed training with parameter server + )DOC") .def_property("_dry_run", [](const ExecutionStrategy &self) { return self.dry_run_; }, [](ExecutionStrategy &self, bool dry_run) { diff --git a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/distributed_strategy.py b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/distributed_strategy.py index 05f920f426f..2eb69d76e41 100644 --- a/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/distributed_strategy.py +++ b/python/paddle/fluid/incubate/fleet/parameter_server/distribute_transpiler/distributed_strategy.py @@ -196,8 +196,9 @@ class HalfAsyncStrategy(DistributedStrategy): super(HalfAsyncStrategy, self).__init__() self._program_config.sync_mode = False self._program_config.runtime_split_send_recv = True - self._build_strategy.async_mode = True self._program_config.half_async = True + self._build_strategy.async_mode = True + self._execute_strategy.use_thread_barrier = True class GeoStrategy(DistributedStrategy): diff --git a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py index 0e3186f9eca..df50853360c 100644 --- a/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py +++ b/python/paddle/fluid/tests/unittests/dist_fleet_ctr.py @@ -39,7 +39,7 @@ class TestDistCTR2x2(FleetDistRunnerBase): For test CTR model, using Fleet api """ - def net(self, batch_size=4, lr=0.01): + def net(self, args, batch_size=4, lr=0.01): """ network definition @@ -72,6 +72,13 @@ class TestDistCTR2x2(FleetDistRunnerBase): datas = [dnn_data, lr_data, label] + if args.reader == "pyreader": + self.reader = fluid.io.PyReader( + feed_list=datas, + capacity=64, + iterable=False, + use_double_buffer=False) + # build dnn model dnn_layer_dims = [128, 128, 64, 32, 1] dnn_embedding = fluid.layers.embedding( diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py index 8c7ecf6aa00..0faaa7384ff 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_base.py @@ -102,7 +102,7 @@ class FleetDistRunnerBase(object): def run_pserver(self, args): fleet.init(self.build_role(args)) strategy = self.build_strategy(args) - avg_cost = self.net() + avg_cost = self.net(args) self.build_optimizer(avg_cost, strategy) fleet.init_server() @@ -111,24 +111,18 @@ class FleetDistRunnerBase(object): def run_dataset_trainer(self, args): fleet.init(self.build_role(args)) strategy = self.build_strategy(args) - avg_cost = self.net() + avg_cost = self.net(args) self.build_optimizer(avg_cost, strategy) out = self.do_dataset_training(fleet) def run_pyreader_trainer(self, args): fleet.init(self.build_role(args)) strategy = self.build_strategy(args) - avg_cost = self.net() - self.reader = fluid.io.PyReader( - feed_list=self.feeds, - capacity=64, - iterable=False, - use_double_buffer=False) - + avg_cost = self.net(args) self.build_optimizer(avg_cost, strategy) out = self.do_pyreader_training(fleet) - def net(self, batch_size=4, lr=0.01): + def net(self, args, batch_size=4, lr=0.01): raise NotImplementedError( "get_model should be implemented by child classes.") diff --git a/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py b/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py index 0e64e318cc5..a2782794da7 100644 --- a/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py +++ b/python/paddle/fluid/tests/unittests/test_dist_fleet_ctr.py @@ -34,7 +34,8 @@ class TestDistMnistSync2x2(TestFleetBase): "PYTHONPATH": os.getenv("PYTHONPATH", ""), "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""), "FLAGS_rpc_deadline": "5000", # 5sec to fail fast - "http_proxy": "" + "http_proxy": "", + "CPU_NUM": "2" } required_envs.update(need_envs) @@ -65,7 +66,8 @@ class TestDistMnistAsync2x2(TestFleetBase): "PYTHONPATH": os.getenv("PYTHONPATH", ""), "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""), "FLAGS_rpc_deadline": "5000", # 5sec to fail fast - "http_proxy": "" + "http_proxy": "", + "CPU_NUM": "2" } required_envs.update(need_envs) @@ -129,9 +131,9 @@ class TestDistCtrHalfAsync2x2(TestFleetBase): "LD_LIBRARY_PATH": os.getenv("LD_LIBRARY_PATH", ""), "FLAGS_rpc_deadline": "30000", # 5sec to fail fast "http_proxy": "", - "FLAGS_communicator_send_queue_size": "1", - "FLAGS_communicator_max_merge_var_num": "1", - "CPU_NUM": "1", + "FLAGS_communicator_send_queue_size": "2", + "FLAGS_communicator_max_merge_var_num": "2", + "CPU_NUM": "2", "SAVE_MODEL": "0" } -- GitLab