diff --git a/paddle/fluid/framework/CMakeLists.txt b/paddle/fluid/framework/CMakeLists.txt index 1d62792b80dd002b894da28be9162fc7d3ce054e..30c8f04da23ca9125792dab466002da80d1776bf 100644 --- a/paddle/fluid/framework/CMakeLists.txt +++ b/paddle/fluid/framework/CMakeLists.txt @@ -100,7 +100,11 @@ else() endif() -cc_library(parallel_executor SRCS parallel_executor.cc DEPS threaded_ssa_graph_executor scope_buffered_ssa_graph_executor graph graph_viz_pass multi_devices_graph_pass multi_devices_graph_print_pass multi_devices_graph_check_pass) +cc_library(parallel_executor SRCS parallel_executor.cc DEPS + threaded_ssa_graph_executor scope_buffered_ssa_graph_executor + graph graph_viz_pass multi_devices_graph_pass + multi_devices_graph_print_pass multi_devices_graph_check_pass + fast_threaded_ssa_graph_executor) cc_library(prune SRCS prune.cc DEPS framework_proto) cc_test(prune_test SRCS prune_test.cc DEPS op_info prune recurrent_op device_context) diff --git a/paddle/fluid/framework/details/execution_strategy.h b/paddle/fluid/framework/details/execution_strategy.h index 716d674fa29bad9321fc20979775c06f26bf4679..5183be878eb49cccc68603c3fdd8023be5578036 100644 --- a/paddle/fluid/framework/details/execution_strategy.h +++ b/paddle/fluid/framework/details/execution_strategy.h @@ -19,10 +19,13 @@ namespace framework { namespace details { struct ExecutionStrategy { + enum ExecutorType { kDefault = 0, kExperimental = 1 }; + size_t num_threads_{0}; bool use_cuda_{true}; bool allow_op_delay_{false}; size_t num_iteration_per_drop_scope_{100}; + ExecutorType type_{kDefault}; }; } // namespace details diff --git a/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc index 08b0db4270c525c9041d86bc83d51b270a1422ac..227a366f22fbbfebf68b53d321cf8daff3300662 100644 --- a/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc @@ -105,7 +105,7 @@ FeedFetchList FastThreadedSSAGraphExecutor::Run( for (int i = 0; i < remaining; ++i) { complete_q.Pop(); } - LOG(FATAL) << "On exception thrown, not implemented"; + exception_.ReThrow(); } num_complete += num_comp; } @@ -127,6 +127,7 @@ void FastThreadedSSAGraphExecutor::RunOpAsync( op_to_run->Run(strategy_.use_cuda_); ++complete; } catch (...) { + exception_.Catch(std::current_exception()); --remaining_; complete_q->Push(-1UL); return; @@ -161,6 +162,8 @@ void FastThreadedSSAGraphExecutor::PrepareAtomicOpDeps() { std::unordered_map>>(op_deps); }); } + +const ir::Graph &FastThreadedSSAGraphExecutor::Graph() const { return *graph_; } } // namespace details } // namespace framework } // namespace paddle diff --git a/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h b/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h index 48f741ceccfb46499a35c2e921ab8f71da7bc6b9..dad3a231cba6402f57ba654a9ac5fb520b9c8f04 100644 --- a/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h @@ -34,6 +34,7 @@ class FastThreadedSSAGraphExecutor : public SSAGraphExecutor { const std::vector &places, std::unique_ptr &&graph); FeedFetchList Run(const std::vector &fetch_tensors) override; + const ir::Graph &Graph() const override; private: ExecutionStrategy strategy_; @@ -56,6 +57,7 @@ class FastThreadedSSAGraphExecutor : public SSAGraphExecutor { std::future< std::unique_ptr>>> atomic_op_deps_; + ExceptionHolder exception_; }; } // namespace details } // namespace framework diff --git a/paddle/fluid/framework/parallel_executor.cc b/paddle/fluid/framework/parallel_executor.cc index 275cb8c592c3c0b153d31149570cd6596b9e1a7f..81cb24bdda6b87a3d708cf5047dce05d5020a0d5 100644 --- a/paddle/fluid/framework/parallel_executor.cc +++ b/paddle/fluid/framework/parallel_executor.cc @@ -25,6 +25,7 @@ limitations under the License. */ #include "paddle/fluid/platform/nccl_helper.h" #endif +#include "paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h" #include "paddle/fluid/framework/details/multi_devices_graph_check_pass.h" #include "paddle/fluid/framework/details/multi_devices_graph_print_pass.h" #include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h" @@ -193,8 +194,14 @@ ParallelExecutor::ParallelExecutor( member_->local_scopes_, member_->use_cuda_, build_strategy); #endif - member_->executor_.reset(new details::ThreadedSSAGraphExecutor( - exec_strategy, member_->local_scopes_, places, std::move(graph))); + if (exec_strategy.type_ == ExecutionStrategy::kDefault) { + member_->executor_.reset(new details::ThreadedSSAGraphExecutor( + exec_strategy, member_->local_scopes_, places, std::move(graph))); + } else { + member_->executor_.reset(new details::FastThreadedSSAGraphExecutor( + exec_strategy, member_->local_scopes_, places, std::move(graph))); + } + member_->executor_.reset(new details::ScopeBufferedSSAGraphExecutor( exec_strategy, member_->local_scopes_, std::move(var_infos), member_->places_, std::move(member_->executor_))); diff --git a/paddle/fluid/pybind/pybind.cc b/paddle/fluid/pybind/pybind.cc index 7127bb38f6ddf8a55c1741d1f0ef18c8d9067fba..4fac04c5999712a14807861309ebff7190f584a5 100644 --- a/paddle/fluid/pybind/pybind.cc +++ b/paddle/fluid/pybind/pybind.cc @@ -593,8 +593,8 @@ All parameter, weight, gradient are variables in Paddle. // -- python binds for parallel executor. py::class_ pe(m, "ParallelExecutor"); - py::class_(pe, "ExecutionStrategy") - .def(py::init()) + py::class_ exec_strategy(pe, "ExecutionStrategy"); + exec_strategy.def(py::init()) .def_property( "num_threads", [](const ExecutionStrategy &self) { return self.num_threads_; }, @@ -621,6 +621,15 @@ All parameter, weight, gradient are variables in Paddle. [](ExecutionStrategy &self, size_t num_iteration_per_drop_scope) { self.num_iteration_per_drop_scope_ = num_iteration_per_drop_scope; }); + py::enum_(exec_strategy, "ExecutorType") + .value("Default", ExecutionStrategy::kDefault) + .value("Experimental", ExecutionStrategy::kExperimental); + exec_strategy.def_property( + "type", [](const ExecutionStrategy &self) { return self.type_; }, + [](ExecutionStrategy &self, ExecutionStrategy::ExecutorType type) { + self.type_ = type; + }); + py::class_ build_strategy(pe, "BuildStrategy"); py::enum_(build_strategy, "ReduceStrategy") diff --git a/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py b/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py index 67c35e9de7e83699bf30ca946856bb907152cbdd..30f74c8ebcebd1824cf147ec61f1ada6eb73b513 100644 --- a/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py +++ b/python/paddle/fluid/tests/unittests/parallel_executor_test_base.py @@ -36,7 +36,8 @@ class TestParallelExecutorBase(unittest.TestCase): seed=None, use_parallel_executor=True, use_reduce=False, - optimizer=fluid.optimizer.Adam): + optimizer=fluid.optimizer.Adam, + use_fast_executor=False): def run_executor(exe, feed, fetch_list, program=None): if isinstance(exe, fluid.ParallelExecutor): res = exe.run(fetch_list=fetch_list, feed=feed) @@ -69,6 +70,8 @@ class TestParallelExecutorBase(unittest.TestCase): startup_exe.run(startup) exec_strategy = fluid.ExecutionStrategy() exec_strategy.allow_op_delay = allow_op_delay + if use_fast_executor: + exec_strategy.type = fluid.ExecutionStrategy.ExecutorType.Experimental build_strategy = fluid.BuildStrategy() build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce \ diff --git a/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py b/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py index 9448d89cd58f4e5cff4bac49821fbc44c5a46246..4750783d7e70fbfef61e4eebc1d5419dc56ac714 100644 --- a/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py +++ b/python/paddle/fluid/tests/unittests/test_parallel_executor_mnist.py @@ -181,7 +181,9 @@ class TestMNIST(TestParallelExecutorBase): use_parallel_executor=True) self.assertAlmostEquals( - np.mean(parallel_first_loss), single_first_loss, delta=1e-6) + np.mean(parallel_first_loss), + single_first_loss, + delta=1e-6, ) self.assertAlmostEquals( np.mean(parallel_last_loss), single_last_loss, delta=1e-6) @@ -189,7 +191,7 @@ class TestMNIST(TestParallelExecutorBase): self.check_simple_fc_parallel_accuracy(True) self.check_simple_fc_parallel_accuracy(False) - def check_batchnorm_fc_convergence(self, use_cuda): + def check_batchnorm_fc_convergence(self, use_cuda, use_fast_executor): if use_cuda and not core.is_compiled_with_cuda(): return @@ -201,11 +203,13 @@ class TestMNIST(TestParallelExecutorBase): fc_with_batchnorm, feed_dict={"image": img, "label": label}, - use_cuda=use_cuda) + use_cuda=use_cuda, + use_fast_executor=use_fast_executor) def test_batchnorm_fc(self): - self.check_batchnorm_fc_convergence(True) - self.check_batchnorm_fc_convergence(False) + for use_cuda in (False, True): + for use_fast_executor in (False, True): + self.check_batchnorm_fc_convergence(use_cuda, use_fast_executor) def test_batchnorm_fc_with_new_strategy(self): # FIXME(zcd): close this test temporally.