未验证 提交 c6af7201 编写于 作者: Y Yu Yang 提交者: GitHub

Merge pull request #12692 from reyoung/feature/fast_executor

Feature/fast executor
...@@ -100,7 +100,11 @@ else() ...@@ -100,7 +100,11 @@ else()
endif() endif()
cc_library(parallel_executor SRCS parallel_executor.cc DEPS threaded_ssa_graph_executor scope_buffered_ssa_graph_executor graph graph_viz_pass multi_devices_graph_pass multi_devices_graph_print_pass multi_devices_graph_check_pass) cc_library(parallel_executor SRCS parallel_executor.cc DEPS
threaded_ssa_graph_executor scope_buffered_ssa_graph_executor
graph graph_viz_pass multi_devices_graph_pass
multi_devices_graph_print_pass multi_devices_graph_check_pass
fast_threaded_ssa_graph_executor)
cc_library(prune SRCS prune.cc DEPS framework_proto) cc_library(prune SRCS prune.cc DEPS framework_proto)
cc_test(prune_test SRCS prune_test.cc DEPS op_info prune recurrent_op device_context) cc_test(prune_test SRCS prune_test.cc DEPS op_info prune recurrent_op device_context)
......
...@@ -42,3 +42,5 @@ cc_test(gather_op_test SRCS gather_op_handle_test.cc DEPS var_handle op_handle_b ...@@ -42,3 +42,5 @@ cc_test(gather_op_test SRCS gather_op_handle_test.cc DEPS var_handle op_handle_b
cc_library(scope_buffered_ssa_graph_executor SRCS scope_buffered_ssa_graph_executor.cc DEPS ssa_graph_executor) cc_library(scope_buffered_ssa_graph_executor SRCS scope_buffered_ssa_graph_executor.cc DEPS ssa_graph_executor)
#cc_test(reduce_op_handle_test SRCS reduce_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory #cc_test(reduce_op_handle_test SRCS reduce_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory
# device_context reduce_op_handle ) # device_context reduce_op_handle )
cc_library(fast_threaded_ssa_graph_executor SRCS fast_threaded_ssa_graph_executor.cc
DEPS fetch_op_handle ssa_graph_executor scope simple_threadpool device_context)
...@@ -19,10 +19,13 @@ namespace framework { ...@@ -19,10 +19,13 @@ namespace framework {
namespace details { namespace details {
struct ExecutionStrategy { struct ExecutionStrategy {
enum ExecutorType { kDefault = 0, kExperimental = 1 };
size_t num_threads_{0}; size_t num_threads_{0};
bool use_cuda_{true}; bool use_cuda_{true};
bool allow_op_delay_{false}; bool allow_op_delay_{false};
size_t num_iteration_per_drop_scope_{100}; size_t num_iteration_per_drop_scope_{100};
ExecutorType type_{kDefault};
}; };
} // namespace details } // namespace details
......
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h"
#include <string>
#include <vector>
#include "paddle/fluid/framework/details/fetch_op_handle.h"
#include "paddle/fluid/framework/details/multi_devices_helper.h"
namespace paddle {
namespace framework {
namespace details {
FastThreadedSSAGraphExecutor::FastThreadedSSAGraphExecutor(
const ExecutionStrategy &strategy, const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
std::unique_ptr<ir::Graph> &&graph)
: strategy_(strategy),
local_scopes_(local_scopes),
places_(places),
graph_(std::move(graph)),
pool_(strategy.num_threads_ +
1), // add one more thread for generate op_deps
fetch_ctxs_(places) {
auto &ops = graph_->Get<details::GraphOps>("ops");
for (auto &op : ops) {
int dep = static_cast<int>(op->NotReadyInputSize());
op_deps_.emplace(op.get(), dep);
if (dep == 0) {
bootstrap_ops_.emplace_back(op.get());
}
}
PrepareAtomicOpDeps();
}
FeedFetchList FastThreadedSSAGraphExecutor::Run(
const std::vector<std::string> &fetch_tensors) {
std::unique_ptr<std::unordered_map<OpHandleBase *, std::atomic<int>>>
op_deps = atomic_op_deps_.get();
PrepareAtomicOpDeps();
paddle::framework::FeedFetchList fetches;
fetches.resize(fetch_tensors.size());
std::unordered_map<std::string, std::vector<VarHandleBase *>> fetched_vars;
std::vector<std::unique_ptr<ir::Node>> fetch_nodes;
std::vector<std::unique_ptr<FetchOpHandle>> fetch_ops;
for (auto &fetch_var_name : fetch_tensors) {
for (auto &var_map : graph_->Get<details::GraphVars>("vars")) {
auto it = var_map.find(fetch_var_name);
if (it != var_map.end()) {
fetched_vars[fetch_var_name].push_back(it->second.rbegin()->get());
}
}
}
for (size_t i = 0; i < fetch_tensors.size(); ++i) {
auto &var_name = fetch_tensors[i];
auto fetched_var_it = fetched_vars.find(var_name);
PADDLE_ENFORCE(fetched_var_it != fetched_vars.end(),
"Cannot find fetched variable.(Perhaps the main_program "
"is not set to ParallelExecutor)");
auto &vars = fetched_var_it->second;
fetch_nodes.emplace_back(new ir::Node("fetch", ir::Node::Type::kOperation));
auto *op = new FetchOpHandle(fetch_nodes.back().get(), &fetches, i,
&local_scopes_);
fetch_ops.emplace_back(op);
for (auto &p : places_) {
op->SetDeviceContext(p, fetch_ctxs_.Get(p));
}
for (auto *var : vars) {
op->AddInput(var);
}
(*op_deps)[op] = static_cast<int>(op->NotReadyInputSize());
}
size_t num_complete = 0;
remaining_ = 0;
BlockingQueue<size_t> complete_q;
for (auto op : bootstrap_ops_) {
RunOpAsync(op_deps.get(), op, &complete_q);
}
while (num_complete != op_deps->size()) {
size_t num_comp = complete_q.Pop();
if (num_comp == -1UL) {
int remaining = 0;
while (true) {
remaining = remaining_;
if (remaining == 0) {
break;
}
for (int i = 0; i < remaining; ++i) {
complete_q.Pop();
}
}
exception_.ReThrow();
}
num_complete += num_comp;
}
// Wait FetchOps.
if (!fetch_ops.empty()) {
fetch_ops.clear();
}
return fetches;
}
void FastThreadedSSAGraphExecutor::RunOpAsync(
std::unordered_map<OpHandleBase *, std::atomic<int>> *op_deps,
OpHandleBase *op, BlockingQueue<size_t> *complete_q) {
++remaining_;
this->pool_.enqueue([=] {
OpHandleBase *op_to_run = op;
size_t complete = 0;
while (op_to_run != nullptr) {
try {
op_to_run->Run(strategy_.use_cuda_);
++complete;
} catch (...) {
exception_.Catch(std::current_exception());
--remaining_;
complete_q->Push(-1UL);
return;
}
auto &outputs = op_to_run->Outputs();
op_to_run = nullptr;
for (auto &output : outputs) {
for (auto &pending_op : output->PendingOps()) {
std::atomic<int> &deps = op_deps->at(pending_op);
if (deps.fetch_sub(1) == 1) { // pending_op ready
if (op_to_run == nullptr) {
op_to_run = pending_op;
} else {
this->RunOpAsync(op_deps, pending_op, complete_q);
}
}
}
}
}
--remaining_;
complete_q->Push(complete);
});
}
void FastThreadedSSAGraphExecutor::PrepareAtomicOpDeps() {
atomic_op_deps_ = pool_.enqueue([&] {
std::unordered_map<OpHandleBase *, std::atomic<int>> *op_deps =
new std::unordered_map<OpHandleBase *, std::atomic<int>>;
for (auto &pair : op_deps_) {
(*op_deps)[pair.first] = pair.second;
}
return std::unique_ptr<
std::unordered_map<OpHandleBase *, std::atomic<int>>>(op_deps);
});
}
const ir::Graph &FastThreadedSSAGraphExecutor::Graph() const { return *graph_; }
} // namespace details
} // namespace framework
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <string>
#include <vector>
#include "ThreadPool.h"
#include "paddle/fluid/framework/blocking_queue.h"
#include "paddle/fluid/framework/details/exception_holder.h"
#include "paddle/fluid/framework/details/execution_strategy.h"
#include "paddle/fluid/framework/details/ssa_graph_executor.h"
namespace paddle {
namespace framework {
class Scope;
namespace details {
class OpHandleBase;
class FastThreadedSSAGraphExecutor : public SSAGraphExecutor {
public:
FastThreadedSSAGraphExecutor(const ExecutionStrategy &strategy,
const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
std::unique_ptr<ir::Graph> &&graph);
FeedFetchList Run(const std::vector<std::string> &fetch_tensors) override;
const ir::Graph &Graph() const override;
private:
ExecutionStrategy strategy_;
std::vector<Scope *> local_scopes_;
std::vector<platform::Place> places_;
std::unique_ptr<ir::Graph> graph_;
std::unordered_map<OpHandleBase *, int> op_deps_;
std::vector<OpHandleBase *> bootstrap_ops_;
::ThreadPool pool_;
platform::DeviceContextPool fetch_ctxs_;
std::atomic<int> remaining_;
void RunOpAsync(std::unordered_map<OpHandleBase *, std::atomic<int>> *op_deps,
OpHandleBase *op, BlockingQueue<size_t> *complete_q);
void PrepareAtomicOpDeps();
std::future<
std::unique_ptr<std::unordered_map<OpHandleBase *, std::atomic<int>>>>
atomic_op_deps_;
ExceptionHolder exception_;
};
} // namespace details
} // namespace framework
} // namespace paddle
...@@ -158,6 +158,16 @@ void OpHandleBase::RunAndRecordEvent(platform::Place p, ...@@ -158,6 +158,16 @@ void OpHandleBase::RunAndRecordEvent(platform::Place p,
#endif #endif
} }
size_t OpHandleBase::NotReadyInputSize() const {
std::unordered_set<VarHandleBase *> res;
for (auto *var : inputs_) {
if (var->GeneratedOp() != nullptr) {
res.emplace(var);
}
}
return res.size();
}
} // namespace details } // namespace details
} // namespace framework } // namespace framework
} // namespace paddle } // namespace paddle
...@@ -81,6 +81,8 @@ class OpHandleBase { ...@@ -81,6 +81,8 @@ class OpHandleBase {
return res.size(); return res.size();
} }
size_t NotReadyInputSize() const;
const std::vector<VarHandleBase *> &Outputs() const { return outputs_; } const std::vector<VarHandleBase *> &Outputs() const { return outputs_; }
size_t NoDummyInputSize() const; size_t NoDummyInputSize() const;
......
...@@ -25,6 +25,7 @@ limitations under the License. */ ...@@ -25,6 +25,7 @@ limitations under the License. */
#include "paddle/fluid/platform/nccl_helper.h" #include "paddle/fluid/platform/nccl_helper.h"
#endif #endif
#include "paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h"
#include "paddle/fluid/framework/details/multi_devices_graph_check_pass.h" #include "paddle/fluid/framework/details/multi_devices_graph_check_pass.h"
#include "paddle/fluid/framework/details/multi_devices_graph_print_pass.h" #include "paddle/fluid/framework/details/multi_devices_graph_print_pass.h"
#include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h" #include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h"
...@@ -193,8 +194,14 @@ ParallelExecutor::ParallelExecutor( ...@@ -193,8 +194,14 @@ ParallelExecutor::ParallelExecutor(
member_->local_scopes_, member_->use_cuda_, build_strategy); member_->local_scopes_, member_->use_cuda_, build_strategy);
#endif #endif
member_->executor_.reset(new details::ThreadedSSAGraphExecutor( if (exec_strategy.type_ == ExecutionStrategy::kDefault) {
exec_strategy, member_->local_scopes_, places, std::move(graph))); member_->executor_.reset(new details::ThreadedSSAGraphExecutor(
exec_strategy, member_->local_scopes_, places, std::move(graph)));
} else {
member_->executor_.reset(new details::FastThreadedSSAGraphExecutor(
exec_strategy, member_->local_scopes_, places, std::move(graph)));
}
member_->executor_.reset(new details::ScopeBufferedSSAGraphExecutor( member_->executor_.reset(new details::ScopeBufferedSSAGraphExecutor(
exec_strategy, member_->local_scopes_, std::move(var_infos), exec_strategy, member_->local_scopes_, std::move(var_infos),
member_->places_, std::move(member_->executor_))); member_->places_, std::move(member_->executor_)));
......
...@@ -596,8 +596,8 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -596,8 +596,8 @@ All parameter, weight, gradient are variables in Paddle.
// -- python binds for parallel executor. // -- python binds for parallel executor.
py::class_<ParallelExecutor> pe(m, "ParallelExecutor"); py::class_<ParallelExecutor> pe(m, "ParallelExecutor");
py::class_<ExecutionStrategy>(pe, "ExecutionStrategy") py::class_<ExecutionStrategy> exec_strategy(pe, "ExecutionStrategy");
.def(py::init()) exec_strategy.def(py::init())
.def_property( .def_property(
"num_threads", "num_threads",
[](const ExecutionStrategy &self) { return self.num_threads_; }, [](const ExecutionStrategy &self) { return self.num_threads_; },
...@@ -624,6 +624,16 @@ All parameter, weight, gradient are variables in Paddle. ...@@ -624,6 +624,16 @@ All parameter, weight, gradient are variables in Paddle.
[](ExecutionStrategy &self, size_t num_iteration_per_drop_scope) { [](ExecutionStrategy &self, size_t num_iteration_per_drop_scope) {
self.num_iteration_per_drop_scope_ = num_iteration_per_drop_scope; self.num_iteration_per_drop_scope_ = num_iteration_per_drop_scope;
}); });
exec_strategy.def_property(
"use_experimental_executor",
[](const ExecutionStrategy &self) {
return self.type_ == ExecutionStrategy::kExperimental;
},
[](ExecutionStrategy &self, bool experimental) {
self.type_ = experimental ? ExecutionStrategy::kExperimental
: ExecutionStrategy::kDefault;
});
py::class_<BuildStrategy> build_strategy(pe, "BuildStrategy"); py::class_<BuildStrategy> build_strategy(pe, "BuildStrategy");
py::enum_<BuildStrategy::ReduceStrategy>(build_strategy, "ReduceStrategy") py::enum_<BuildStrategy::ReduceStrategy>(build_strategy, "ReduceStrategy")
......
...@@ -38,7 +38,8 @@ class TestParallelExecutorBase(unittest.TestCase): ...@@ -38,7 +38,8 @@ class TestParallelExecutorBase(unittest.TestCase):
seed=None, seed=None,
use_parallel_executor=True, use_parallel_executor=True,
use_reduce=False, use_reduce=False,
optimizer=fluid.optimizer.Adam): optimizer=fluid.optimizer.Adam,
use_fast_executor=False):
def run_executor(exe, feed, fetch_list, program=None): def run_executor(exe, feed, fetch_list, program=None):
if isinstance(exe, fluid.ParallelExecutor): if isinstance(exe, fluid.ParallelExecutor):
res = exe.run(fetch_list=fetch_list, feed=feed) res = exe.run(fetch_list=fetch_list, feed=feed)
...@@ -71,6 +72,8 @@ class TestParallelExecutorBase(unittest.TestCase): ...@@ -71,6 +72,8 @@ class TestParallelExecutorBase(unittest.TestCase):
startup_exe.run(startup) startup_exe.run(startup)
exec_strategy = fluid.ExecutionStrategy() exec_strategy = fluid.ExecutionStrategy()
exec_strategy.allow_op_delay = allow_op_delay exec_strategy.allow_op_delay = allow_op_delay
if use_fast_executor:
exec_strategy.use_experimental_executor = True
build_strategy = fluid.BuildStrategy() build_strategy = fluid.BuildStrategy()
build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce \ build_strategy.reduce_strategy = fluid.BuildStrategy.ReduceStrategy.Reduce \
......
...@@ -183,7 +183,9 @@ class TestMNIST(TestParallelExecutorBase): ...@@ -183,7 +183,9 @@ class TestMNIST(TestParallelExecutorBase):
use_parallel_executor=True) use_parallel_executor=True)
self.assertAlmostEquals( self.assertAlmostEquals(
np.mean(parallel_first_loss), single_first_loss, delta=1e-6) np.mean(parallel_first_loss),
single_first_loss,
delta=1e-6, )
self.assertAlmostEquals( self.assertAlmostEquals(
np.mean(parallel_last_loss), single_last_loss, delta=1e-6) np.mean(parallel_last_loss), single_last_loss, delta=1e-6)
...@@ -191,7 +193,7 @@ class TestMNIST(TestParallelExecutorBase): ...@@ -191,7 +193,7 @@ class TestMNIST(TestParallelExecutorBase):
self.check_simple_fc_parallel_accuracy(True) self.check_simple_fc_parallel_accuracy(True)
self.check_simple_fc_parallel_accuracy(False) self.check_simple_fc_parallel_accuracy(False)
def check_batchnorm_fc_convergence(self, use_cuda): def check_batchnorm_fc_convergence(self, use_cuda, use_fast_executor):
if use_cuda and not core.is_compiled_with_cuda(): if use_cuda and not core.is_compiled_with_cuda():
return return
...@@ -203,11 +205,13 @@ class TestMNIST(TestParallelExecutorBase): ...@@ -203,11 +205,13 @@ class TestMNIST(TestParallelExecutorBase):
fc_with_batchnorm, fc_with_batchnorm,
feed_dict={"image": img, feed_dict={"image": img,
"label": label}, "label": label},
use_cuda=use_cuda) use_cuda=use_cuda,
use_fast_executor=use_fast_executor)
def test_batchnorm_fc(self): def test_batchnorm_fc(self):
self.check_batchnorm_fc_convergence(True) for use_cuda in (False, True):
self.check_batchnorm_fc_convergence(False) for use_fast_executor in (False, True):
self.check_batchnorm_fc_convergence(use_cuda, use_fast_executor)
def test_batchnorm_fc_with_new_strategy(self): def test_batchnorm_fc_with_new_strategy(self):
# FIXME(zcd): close this test temporally. # FIXME(zcd): close this test temporally.
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册