From 2265d091e6a0e18b48f801e73f048112ecc24904 Mon Sep 17 00:00:00 2001 From: chengduo Date: Thu, 28 Mar 2019 07:42:52 -0500 Subject: [PATCH] Fix threaded executor bug (#16508) * fix threaded executor bug test=develop * change the order of class member test=develop * Fix Travis CI test=develop --- .../fast_threaded_ssa_graph_executor.cc | 5 +++-- .../fast_threaded_ssa_graph_executor.h | 19 ++++++++++++------- .../details/threaded_ssa_graph_executor.cc | 8 ++++---- .../details/threaded_ssa_graph_executor.h | 19 +++++++++---------- 4 files changed, 28 insertions(+), 23 deletions(-) diff --git a/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc index d4fbea9d95..297ee92fc3 100644 --- a/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.cc @@ -31,9 +31,10 @@ FastThreadedSSAGraphExecutor::FastThreadedSSAGraphExecutor( local_scopes_(local_scopes), places_(places), graph_(graph), + fetch_ctxs_(places), pool_(strategy.num_threads_), - prepare_pool_(1), // add one more thread for generate op_deps - fetch_ctxs_(places) { + // add one more thread for generate op_deps + prepare_pool_(1) { for (auto &op : ir::FilterByNodeWrapper(*graph_)) { int dep = static_cast(op->NotReadyInputSize()); op_deps_.emplace(op, dep); diff --git a/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h b/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h index 970298950c..f6d5160e75 100644 --- a/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h @@ -14,7 +14,9 @@ #pragma once #include +#include #include +#include #include #include "paddle/fluid/framework/blocking_queue.h" #include "paddle/fluid/framework/details/exception_holder.h" @@ -37,6 +39,8 @@ class FastThreadedSSAGraphExecutor : public SSAGraphExecutor { const ir::Graph &Graph() const override; private: + // Note(zcd): the ThreadPool should be placed last so that ThreadPool should + // be destroyed first. ExecutionStrategy strategy_; std::vector local_scopes_; std::vector places_; @@ -45,21 +49,22 @@ class FastThreadedSSAGraphExecutor : public SSAGraphExecutor { std::unordered_map op_deps_; std::vector bootstrap_ops_; - ::ThreadPool pool_; - ::ThreadPool prepare_pool_; platform::DeviceContextPool fetch_ctxs_; std::atomic remaining_; + std::future< + std::unique_ptr>>> + atomic_op_deps_; + ExceptionHolder exception_; + + ::ThreadPool pool_; + ::ThreadPool prepare_pool_; + void RunOpAsync(std::unordered_map> *op_deps, OpHandleBase *op, const std::shared_ptr> &complete_q); void PrepareAtomicOpDeps(); - - std::future< - std::unique_ptr>>> - atomic_op_deps_; - ExceptionHolder exception_; }; } // namespace details } // namespace framework diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc index c4254bbadf..c00932a7bd 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.cc @@ -24,13 +24,13 @@ ThreadedSSAGraphExecutor::ThreadedSSAGraphExecutor( const ExecutionStrategy &strategy, const std::vector &local_scopes, const std::vector &places, ir::Graph *graph) : graph_(graph), - pool_(strategy.num_threads_ >= 2 ? new ::ThreadPool(strategy.num_threads_) - : nullptr), - prepare_pool_(1), local_scopes_(local_scopes), places_(places), fetch_ctxs_(places), - strategy_(strategy) { + strategy_(strategy), + prepare_pool_(1), + pool_(strategy.num_threads_ >= 2 ? new ::ThreadPool(strategy.num_threads_) + : nullptr) { PrepareOpDeps(); CopyOpDeps(); } diff --git a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h index b9bccba8fa..1fa5196970 100644 --- a/paddle/fluid/framework/details/threaded_ssa_graph_executor.h +++ b/paddle/fluid/framework/details/threaded_ssa_graph_executor.h @@ -63,13 +63,20 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { details::OpHandleBase *op); private: + // Note(zcd): the ThreadPool should be placed last so that ThreadPool should + // be destroyed first. ir::Graph *graph_; - std::unique_ptr<::ThreadPool> pool_; - ::ThreadPool prepare_pool_; std::vector local_scopes_; std::vector places_; platform::DeviceContextPool fetch_ctxs_; ExceptionHolder exception_holder_; + std::unique_ptr op_deps_; + std::future> op_deps_futures_; + ExecutionStrategy strategy_; + // use std::list because clear(), push_back, and for_each are O(1) + std::list> run_op_futures_; + ::ThreadPool prepare_pool_; + std::unique_ptr<::ThreadPool> pool_; void InsertPendingOp(std::unordered_map *pending_ops, OpHandleBase *op_instance) const; @@ -88,14 +95,6 @@ class ThreadedSSAGraphExecutor : public SSAGraphExecutor { void PrepareOpDeps(); void CopyOpDeps(); - - private: - std::future> op_deps_futures_; - - ExecutionStrategy strategy_; - std::unique_ptr op_deps_; - // use std::list because clear(), push_back, and for_each are O(1) - std::list> run_op_futures_; }; } // namespace details -- GitLab