提交 05cadf1b 编写于 作者: Y yuyang18

Add FastExecutor

上级 c6eb7a89
...@@ -100,7 +100,11 @@ else() ...@@ -100,7 +100,11 @@ else()
endif() endif()
cc_library(parallel_executor SRCS parallel_executor.cc DEPS threaded_ssa_graph_executor scope_buffered_ssa_graph_executor graph graph_viz_pass multi_devices_graph_pass multi_devices_graph_print_pass multi_devices_graph_check_pass) cc_library(parallel_executor SRCS parallel_executor.cc DEPS
threaded_ssa_graph_executor scope_buffered_ssa_graph_executor
graph graph_viz_pass multi_devices_graph_pass
multi_devices_graph_print_pass multi_devices_graph_check_pass
fast_threaded_ssa_graph_executor)
cc_library(prune SRCS prune.cc DEPS framework_proto) cc_library(prune SRCS prune.cc DEPS framework_proto)
cc_test(prune_test SRCS prune_test.cc DEPS op_info prune recurrent_op device_context) cc_test(prune_test SRCS prune_test.cc DEPS op_info prune recurrent_op device_context)
......
...@@ -19,10 +19,13 @@ namespace framework { ...@@ -19,10 +19,13 @@ namespace framework {
namespace details { namespace details {
struct ExecutionStrategy { struct ExecutionStrategy {
enum ExecutorType { kDefault = 0, kExperimental = 1 };
size_t num_threads_{0}; size_t num_threads_{0};
bool use_cuda_{true}; bool use_cuda_{true};
bool allow_op_delay_{false}; bool allow_op_delay_{false};
size_t num_iteration_per_drop_scope_{100}; size_t num_iteration_per_drop_scope_{100};
ExecutorType type_{kDefault};
}; };
} // namespace details } // namespace details
......
...@@ -105,7 +105,7 @@ FeedFetchList FastThreadedSSAGraphExecutor::Run( ...@@ -105,7 +105,7 @@ FeedFetchList FastThreadedSSAGraphExecutor::Run(
for (int i = 0; i < remaining; ++i) { for (int i = 0; i < remaining; ++i) {
complete_q.Pop(); complete_q.Pop();
} }
LOG(FATAL) << "On exception thrown, not implemented"; exception_.ReThrow();
} }
num_complete += num_comp; num_complete += num_comp;
} }
...@@ -127,6 +127,7 @@ void FastThreadedSSAGraphExecutor::RunOpAsync( ...@@ -127,6 +127,7 @@ void FastThreadedSSAGraphExecutor::RunOpAsync(
op_to_run->Run(strategy_.use_cuda_); op_to_run->Run(strategy_.use_cuda_);
++complete; ++complete;
} catch (...) { } catch (...) {
exception_.Catch(std::current_exception());
--remaining_; --remaining_;
complete_q->Push(-1UL); complete_q->Push(-1UL);
return; return;
...@@ -161,6 +162,8 @@ void FastThreadedSSAGraphExecutor::PrepareAtomicOpDeps() { ...@@ -161,6 +162,8 @@ void FastThreadedSSAGraphExecutor::PrepareAtomicOpDeps() {
std::unordered_map<OpHandleBase *, std::atomic<int>>>(op_deps); std::unordered_map<OpHandleBase *, std::atomic<int>>>(op_deps);
}); });
} }
const ir::Graph &FastThreadedSSAGraphExecutor::Graph() const { return *graph_; }
} // namespace details } // namespace details
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
...@@ -34,6 +34,7 @@ class FastThreadedSSAGraphExecutor : public SSAGraphExecutor { ...@@ -34,6 +34,7 @@ class FastThreadedSSAGraphExecutor : public SSAGraphExecutor {
const std::vector<platform::Place> &places, const std::vector<platform::Place> &places,
std::unique_ptr<ir::Graph> &&graph); std::unique_ptr<ir::Graph> &&graph);
FeedFetchList Run(const std::vector<std::string> &fetch_tensors) override; FeedFetchList Run(const std::vector<std::string> &fetch_tensors) override;
const ir::Graph &Graph() const override;
private: private:
ExecutionStrategy strategy_; ExecutionStrategy strategy_;
...@@ -56,6 +57,7 @@ class FastThreadedSSAGraphExecutor : public SSAGraphExecutor { ...@@ -56,6 +57,7 @@ class FastThreadedSSAGraphExecutor : public SSAGraphExecutor {
std::future< std::future<
std::unique_ptr<std::unordered_map<OpHandleBase *, std::atomic<int>>>> std::unique_ptr<std::unordered_map<OpHandleBase *, std::atomic<int>>>>
atomic_op_deps_; atomic_op_deps_;
ExceptionHolder exception_;
}; };
} // namespace details } // namespace details
} // namespace framework } // namespace framework
......
...@@ -25,6 +25,7 @@ limitations under the License. */ ...@@ -25,6 +25,7 @@ limitations under the License. */
#include "paddle/fluid/platform/nccl_helper.h" #include "paddle/fluid/platform/nccl_helper.h"
#endif #endif
#include "paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h"
#include "paddle/fluid/framework/details/multi_devices_graph_check_pass.h" #include "paddle/fluid/framework/details/multi_devices_graph_check_pass.h"
#include "paddle/fluid/framework/details/multi_devices_graph_print_pass.h" #include "paddle/fluid/framework/details/multi_devices_graph_print_pass.h"
#include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h" #include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h"
...@@ -193,8 +194,14 @@ ParallelExecutor::ParallelExecutor( ...@@ -193,8 +194,14 @@ ParallelExecutor::ParallelExecutor(
member_->local_scopes_, member_->use_cuda_, build_strategy); member_->local_scopes_, member_->use_cuda_, build_strategy);
#endif #endif
if (exec_strategy.type_ == ExecutionStrategy::kDefault) {
member_->executor_.reset(new details::ThreadedSSAGraphExecutor( member_->executor_.reset(new details::ThreadedSSAGraphExecutor(
exec_strategy, member_->local_scopes_, places, std::move(graph))); exec_strategy, member_->local_scopes_, places, std::move(graph)));
} else {
member_->executor_.reset(new details::FastThreadedSSAGraphExecutor(
exec_strategy, member_->local_scopes_, places, std::move(graph)));
}
member_->executor_.reset(new details::ScopeBufferedSSAGraphExecutor( member_->executor_.reset(new details::ScopeBufferedSSAGraphExecutor(
exec_strategy, member_->local_scopes_, std::move(var_infos), exec_strategy, member_->local_scopes_, std::move(var_infos),
member_->places_, std::move(member_->executor_))); member_->places_, std::move(member_->executor_)));
......
...@@ -593,8 +593,8 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -593,8 +593,8 @@ All parameter, weight, gradient are variables in Paddle.
// -- python binds for parallel executor. // -- python binds for parallel executor.
py::class_<ParallelExecutor> pe(m, "ParallelExecutor"); py::class_<ParallelExecutor> pe(m, "ParallelExecutor");
py::class_<ExecutionStrategy>(pe, "ExecutionStrategy") py::class_<ExecutionStrategy> exec_strategy(pe, "ExecutionStrategy");
.def(py::init()) exec_strategy.def(py::init())
.def_property( .def_property(
"num_threads", "num_threads",
[](const ExecutionStrategy &self) { return self.num_threads_; }, [](const ExecutionStrategy &self) { return self.num_threads_; },
...@@ -621,6 +621,15 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -621,6 +621,15 @@ All parameter, weight, gradient are variables in Paddle.
[](ExecutionStrategy &self, size_t num_iteration_per_drop_scope) { [](ExecutionStrategy &self, size_t num_iteration_per_drop_scope) {
self.num_iteration_per_drop_scope_ = num_iteration_per_drop_scope; self.num_iteration_per_drop_scope_ = num_iteration_per_drop_scope;
}); });
py::enum_<ExecutionStrategy::ExecutorType>(exec_strategy, "ExecutorType")
.value("Default", ExecutionStrategy::kDefault)
.value("Experimental", ExecutionStrategy::kExperimental);
exec_strategy.def_property(
"type", [](const ExecutionStrategy &self) { return self.type_; },
[](ExecutionStrategy &self, ExecutionStrategy::ExecutorType type) {
self.type_ = type;
});
py::class_<BuildStrategy> build_strategy(pe, "BuildStrategy"); py::class_<BuildStrategy> build_strategy(pe, "BuildStrategy");
py::enum_<BuildStrategy::ReduceStrategy>(build_strategy, "ReduceStrategy") py::enum_<BuildStrategy::ReduceStrategy>(build_strategy, "ReduceStrategy")
......
...@@ -36,7 +36,8 @@ class TestParallelExecutorBase(unittest.TestCase): ...@@ -36,7 +36,8 @@ class TestParallelExecutorBase(unittest.TestCase):
seed=None, seed=None,
use_parallel_executor=True, use_parallel_executor=True,
use_reduce=False, use_reduce=False,
optimizer=fluid.optimizer.Adam): optimizer=fluid.optimizer.Adam,
use_fast_executor=False):
def run_executor(exe, feed, fetch_list, program=None): def run_executor(exe, feed, fetch_list, program=None):
if isinstance(exe, fluid.ParallelExecutor): if isinstance(exe, fluid.ParallelExecutor):
res = exe.run(fetch_list=fetch_list, feed=feed) res = exe.run(fetch_list=fetch_list, feed=feed)
...@@ -69,6 +70,8 @@ class TestParallelExecutorBase(unittest.TestCase): ...@@ -69,6 +70,8 @@ class TestParallelExecutorBase(unittest.TestCase):
startup_exe.run(startup) startup_exe.run(startup)
exec_strategy = fluid.ExecutionStrategy() exec_strategy = fluid.ExecutionStrategy()
exec_strategy.allow_op_delay = allow_op_delay exec_strategy.allow_op_delay = allow_op_delay
if use_fast_executor:
exec_strategy.type = fluid.ExecutionStrategy.ExecutorType.Experimental
build_strategy = fluid.BuildStrategy() build_strategy = fluid.BuildStrategy()
build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce \ build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce \
......
...@@ -181,7 +181,9 @@ class TestMNIST(TestParallelExecutorBase): ...@@ -181,7 +181,9 @@ class TestMNIST(TestParallelExecutorBase):
use_parallel_executor=True) use_parallel_executor=True)
self.assertAlmostEquals( self.assertAlmostEquals(
np.mean(parallel_first_loss), single_first_loss, delta=1e-6) np.mean(parallel_first_loss),
single_first_loss,
delta=1e-6, )
self.assertAlmostEquals( self.assertAlmostEquals(
np.mean(parallel_last_loss), single_last_loss, delta=1e-6) np.mean(parallel_last_loss), single_last_loss, delta=1e-6)
...@@ -189,7 +191,7 @@ class TestMNIST(TestParallelExecutorBase): ...@@ -189,7 +191,7 @@ class TestMNIST(TestParallelExecutorBase):
self.check_simple_fc_parallel_accuracy(True) self.check_simple_fc_parallel_accuracy(True)
self.check_simple_fc_parallel_accuracy(False) self.check_simple_fc_parallel_accuracy(False)
def check_batchnorm_fc_convergence(self, use_cuda): def check_batchnorm_fc_convergence(self, use_cuda, use_fast_executor):
if use_cuda and not core.is_compiled_with_cuda(): if use_cuda and not core.is_compiled_with_cuda():
return return
...@@ -201,11 +203,13 @@ class TestMNIST(TestParallelExecutorBase): ...@@ -201,11 +203,13 @@ class TestMNIST(TestParallelExecutorBase):
fc_with_batchnorm, fc_with_batchnorm,
feed_dict={"image": img, feed_dict={"image": img,
"label": label}, "label": label},
use_cuda=use_cuda) use_cuda=use_cuda,
use_fast_executor=use_fast_executor)
def test_batchnorm_fc(self): def test_batchnorm_fc(self):
self.check_batchnorm_fc_convergence(True) for use_cuda in (False, True):
self.check_batchnorm_fc_convergence(False) for use_fast_executor in (False, True):
self.check_batchnorm_fc_convergence(use_cuda, use_fast_executor)
def test_batchnorm_fc_with_new_strategy(self): def test_batchnorm_fc_with_new_strategy(self):
# FIXME(zcd): close this test temporally. # FIXME(zcd): close this test temporally.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册