提交 c9de6f1b 编写于 作者: Y Yancey1989

init parallel graph mode

上级 29d9fb53
......@@ -177,7 +177,7 @@ else()
endif()
cc_library(parallel_executor SRCS parallel_executor.cc DEPS
threaded_ssa_graph_executor scope_buffered_ssa_graph_executor
threaded_ssa_graph_executor scope_buffered_ssa_graph_executor parallel_ssa_graph_executor
graph build_strategy
fast_threaded_ssa_graph_executor variable_helper)
......
......@@ -54,6 +54,8 @@ cc_library(ssa_graph_executor SRCS ssa_graph_executor.cc DEPS ${SSA_GRAPH_EXECUT
cc_library(threaded_ssa_graph_executor SRCS threaded_ssa_graph_executor.cc DEPS fetch_op_handle ssa_graph_executor scope
simple_threadpool device_context)
cc_library(parallel_ssa_graph_executor SRCS parallel_ssa_graph_executor.cc DEPS threaded_ssa_graph_executor)
cc_test(broadcast_op_test SRCS broadcast_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory
device_context broadcast_op_handle)
cc_test(gather_op_test SRCS gather_op_handle_test.cc DEPS var_handle op_handle_base scope ddim memory
......
......@@ -46,20 +46,27 @@ AllReduceOpHandle::AllReduceOpHandle(ir::Node *node,
#endif
void AllReduceOpHandle::RunImpl() {
int64_t start_ts = GetTS();
int64_t func_ts = GetTS();
VLOG(5) << "all_reduce_op_handle::RunImpl start";
platform::RecordEvent record_event(Name(), dev_ctxes_.cbegin()->second);
// FIXME(typhoonzero): If scope0(global scope) have NCCL_ID_VAR,
// this is a distributed or inter-process call, find a better way.
#ifdef PADDLE_WITH_CUDA
if (NoDummyInputSize() == 1 &&
local_scopes_[0]->FindLocalVar(NCCL_ID_VARNAME) == nullptr) {
local_scopes_[0]->FindVar(NCCL_ID_VARNAME) == nullptr) {
#else
if (NoDummyInputSize() == 1) {
#endif
return; // No need to all reduce when GPU count = 1;
} else {
// Wait input done
start_ts = GetTS();
WaitInputVarGenerated();
VLOG(5) << "all_reduce_op_handle wait input var spent: "
<< GetTS() - start_ts << " (ns).";
start_ts = GetTS();
auto in_var_handles = DynamicCast<VarHandle>(this->Inputs());
auto out_var_handles = DynamicCast<VarHandle>(this->Outputs());
PADDLE_ENFORCE_EQ(
......@@ -100,6 +107,8 @@ void AllReduceOpHandle::RunImpl() {
}
int dev_id = boost::get<platform::CUDAPlace>(p).device;
VLOG(5) << "call allreduce: " << in_var_handles[i]->name_
<< " on dev: " << dev_id;
auto &nccl_ctx = nccl_ctxs_->at(dev_id);
auto stream = nccl_ctx.stream();
auto comm = nccl_ctx.comm_;
......@@ -110,11 +119,20 @@ void AllReduceOpHandle::RunImpl() {
});
}
this->RunAndRecordEvent([&] {
platform::NCCLGroupGuard guard;
for (auto &call : all_reduce_calls) {
call();
// TODO(Yancey1989): need allreduce operator to avoid this flag
if (nccl_ctxs_->need_group_call_) {
platform::NCCLGroupGuard guard;
for (auto &call : all_reduce_calls) {
call();
}
} else {
// only used in executor_type == ParallalGraph, one thread one GPU
// TODO(Yancey1989): use allreduce operator to avoid this tricky.
PADDLE_ENFORCE(all_reduce_calls.size() == 1UL);
all_reduce_calls[0]();
}
});
#else
PADDLE_THROW("Not compiled with CUDA");
#endif
......@@ -144,6 +162,8 @@ void AllReduceOpHandle::RunImpl() {
}
}
}
VLOG(5) << "all_reduce_op_handle Impl spent: " << GetTS() - func_ts
<< " (ns).";
}
std::string AllReduceOpHandle::Name() const { return "all_reduce"; }
......
......@@ -118,6 +118,7 @@ std::unique_ptr<ir::Graph> BuildStrategy::Apply(
std::unique_ptr<ir::Graph> graph(new ir::Graph(main_program));
for (std::shared_ptr<ir::Pass> &pass : pass_builder_->AllPasses()) {
VLOG(5) << "run pass: " << pass->Type();
if (pass->Type() == "multi_devices_pass") {
pass->Erase("places");
pass->SetNotOwned<const std::vector<platform::Place>>("places", &places);
......
......@@ -33,10 +33,18 @@ void ComputationOpHandle::RunImpl() {
op_->Run(*scope_->FindVar(kLocalExecScopeName)->Get<Scope *>(), place_);
};
if (is_lock_and_record_event_free_) {
if (Name().compare("conv2d") || Name().compare("conv2d_grad")) {
int64_t start_ts = GetTS();
auto varname = DynamicCast<VarHandle>(this->Outputs())[0]->name_;
run_func();
VLOG(5) << Name() << "_op_handle: " << varname
<< " spent: " << GetTS() - start_ts << " (ns).";
} else {
this->RunAndRecordEvent(run_func);
if (is_lock_and_record_event_free_) {
run_func();
} else {
this->RunAndRecordEvent(run_func);
}
}
}
......
......@@ -17,6 +17,7 @@
#include <string>
#include <vector>
#include "paddle/fluid/framework/details/container_cast.h"
#include "paddle/fluid/framework/details/op_handle_base.h"
#include "paddle/fluid/framework/op_registry.h"
#include "paddle/fluid/framework/operator.h"
......
......@@ -20,7 +20,7 @@ namespace framework {
namespace details {
struct ExecutionStrategy {
enum ExecutorType { kDefault = 0, kExperimental = 1 };
enum ExecutorType { kDefault = 0, kExperimental = 1, kParallelGraph = 2 };
size_t num_threads_{0};
bool use_cuda_{true};
......
......@@ -300,7 +300,7 @@ std::unique_ptr<ir::Graph> MultiDevSSAGraphBuilder::ApplyImpl(
auto nodes = graph->ReleaseNodes();
ir::Graph &result = *graph;
int num_trainers = Get<int>(kNumTrainers);
// int num_trainers = Get<int>(kNumTrainers);
for (auto &node : nodes) {
if (node->IsVar() && node->Var()) {
......@@ -329,6 +329,7 @@ std::unique_ptr<ir::Graph> MultiDevSSAGraphBuilder::ApplyImpl(
std::unordered_map<std::string, int> sharded_var_device;
for (ir::Node *node : sorted_ops) {
VLOG(5) << "op name: " << node->Op()->Type();
if (boost::get<int>(
node->Op()->GetAttr(OpProtoAndCheckerMaker::OpRoleAttrName())) ==
static_cast<int>(OpRole::kRPC)) {
......@@ -365,9 +366,11 @@ std::unique_ptr<ir::Graph> MultiDevSSAGraphBuilder::ApplyImpl(
// is true only for the op that scale the final scalar loss.
// It also assumes backward op will always follow the forward op in
// the block.
VLOG(5) << "this is loss scale op!";
is_forwarding = false;
} else {
int op_dev_id = GetOpDeviceID(result, node, sharded_var_device);
VLOG(5) << "on device id: " << op_dev_id;
if (op_dev_id != -1) { // This op only runs on one specific device.
CreateComputationalOp(&result, node, op_dev_id);
for (ir::Node *n : node->outputs) {
......@@ -386,7 +389,8 @@ std::unique_ptr<ir::Graph> MultiDevSSAGraphBuilder::ApplyImpl(
CreateComputationalOps(&result, node, places_.size());
}
if (!is_forwarding && (places_.size() > 1 || num_trainers > 1)) {
// if (!is_forwarding && (places_.size() > 1 || num_trainers > 1)) {
if (!is_forwarding && nccl_ctxs_->contexts_.size() > 1) {
// Currently, we assume that once gradient is generated, it can be
// broadcast, and each gradient is only broadcast once.
if (static_cast<bool>(boost::get<int>(node->Op()->GetAttr(
......
......@@ -41,6 +41,7 @@ OpHandleBase::~OpHandleBase() {
void OpHandleBase::Run(bool use_cuda) {
#ifdef PADDLE_WITH_CUDA
int64_t start_ts = 0;
if (events_.empty() && use_cuda) {
for (auto &p : dev_ctxes_) {
int dev_id = boost::get<platform::CUDAPlace>(p.first).device;
......@@ -52,7 +53,6 @@ void OpHandleBase::Run(bool use_cuda) {
#else
PADDLE_ENFORCE(!use_cuda);
#endif
RunImpl();
}
......@@ -125,6 +125,7 @@ bool OpHandleBase::NeedWait(VarHandleBase *in_var) {
void OpHandleBase::RunAndRecordEvent(const std::function<void()> &callback) {
#ifdef PADDLE_WITH_CUDA
if (!events_.empty()) { // Use event
VLOG(5) << "events not empty";
std::function<void()> method = callback;
for (auto &p : dev_ctxes_) {
method = [method, p, this]() {
......
......@@ -26,7 +26,6 @@ namespace framework {
namespace details {
constexpr char kLocalExecScopeName[] = "@LCOAL_SCOPE@";
// Wraps ir::Node and provide helper utilities.
// It's responsible for populating necessary fields of ir::Node.
class OpHandleBase {
......
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include "paddle/fluid/framework/details/parallel_ssa_graph_executor.h"
namespace paddle {
namespace framework {
namespace details {
ParallelSSAGraphExecutor::ParallelSSAGraphExecutor(
const ExecutionStrategy &strategy, const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
std::vector<std::unique_ptr<ir::Graph>> graphs)
: strategy_(std::move(strategy)),
local_scopes_(std::move(local_scopes)),
places_(std::move(places)),
graphs_(std::move(graphs)),
pool_(places.size() >= 2 ? new ::ThreadPool(places.size()) : nullptr) {
PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size());
for (size_t i = 0; i < places.size(); ++i) {
std::vector<framework::Scope *> scopes = {local_scopes_[i]};
std::vector<platform::Place> places = {places_[i]};
executors_.emplace_back(new details::ThreadedSSAGraphExecutor(
strategy_, scopes, places, std::move(graphs_[i])));
}
}
FeedFetchList ParallelSSAGraphExecutor::Run(
const std::vector<std::string> &fetch_tensors) {
std::vector<std::future<void>> run_futures;
FeedFetchList fetch_data;
for (size_t i = 0; i < places_.size(); ++i) {
auto call = [this, i] {
// FIXME(Yancey1989): need to fix fetch data failed.
std::vector<std::string> empty;
executors_[i]->Run(empty);
};
if (pool_) {
run_futures.emplace_back(pool_->enqueue(std::move(call)));
} else {
call();
}
}
if (pool_) {
for (auto &f : run_futures) {
f.wait();
}
}
return fetch_data;
}
} // namespace details
} // namespace framework
} // namespace paddle
// Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#pragma once
#include <string>
#include <vector>
#include "ThreadPool.h"
#include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h"
#include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h"
namespace paddle {
namespace framework {
namespace details {
class ParallelSSAGraphExecutor : public SSAGraphExecutor {
public:
ParallelSSAGraphExecutor(const ExecutionStrategy &strategy,
const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
std::vector<std::unique_ptr<ir::Graph>> graphs);
~ParallelSSAGraphExecutor() final = default;
const ir::Graph &Graph() const override { return *graphs_[0]; }
FeedFetchList Run(const std::vector<std::string> &fetch_tensors) override;
private:
ExecutionStrategy strategy_;
std::vector<Scope *> local_scopes_;
std::vector<platform::Place> places_;
std::vector<std::unique_ptr<ir::Graph>> graphs_;
std::unique_ptr<::ThreadPool> pool_;
std::vector<std::unique_ptr<details::ThreadedSSAGraphExecutor>> executors_;
};
} // namespace details
} // namespace framework
} // namespace paddle
......@@ -27,39 +27,40 @@ namespace framework {
namespace details {
ScopeBufferedSSAGraphExecutor::ScopeBufferedSSAGraphExecutor(
ExecutionStrategy strategy, std::vector<Scope *> local_scopes,
std::vector<VariableInfo> var_infos, std::vector<platform::Place> places,
std::vector<std::vector<VariableInfo>> var_infos_list,
std::vector<platform::Place> places,
std::unique_ptr<SSAGraphExecutor> &&underlying_executor)
: strategy_(std::move(strategy)),
underlying_executor_(std::move(underlying_executor)),
local_scopes_(std::move(local_scopes)),
var_infos_(std::move(var_infos)),
var_infos_list_(std::move(var_infos_list)),
places_(std::move(places)) {}
FeedFetchList ScopeBufferedSSAGraphExecutor::Run(
const std::vector<std::string> &fetch_tensors) {
if (drop_scope_counter_ == 0) {
// Create local scopes.
for (auto it = local_scopes_.rbegin(); it != local_scopes_.rend(); ++it) {
auto &scope = *it;
for (size_t i = 0; i < local_scopes_.size(); ++i) {
auto &scope = local_scopes_[i];
Scope &local_scope = scope->NewScope();
*scope->Var(details::kLocalExecScopeName)->GetMutable<Scope *>() =
&local_scope;
for (auto &info : var_infos_) {
if (scope->FindVar(info.name_) != nullptr) {
continue;
}
if (info.persistable_) { // Persistable
InitializeVariable(scope->Var(info.name_), info.type_);
} else {
InitializeVariable(local_scope.Var(info.name_), info.type_);
for (auto &var_infos : var_infos_list_) {
for (auto &info : var_infos) {
if (scope->FindVar(info.name_) != nullptr) {
continue;
}
if (info.persistable_) { // Persistable
InitializeVariable(scope->Var(info.name_), info.type_);
} else {
InitializeVariable(local_scope.Var(info.name_), info.type_);
}
}
}
}
}
std::vector<framework::LoDTensor> fetch_data;
std::exception_ptr eptr;
std::exception_ptr eptr = nullptr;
try {
fetch_data = underlying_executor_->Run(fetch_tensors);
} catch (...) {
......@@ -71,9 +72,13 @@ FeedFetchList ScopeBufferedSSAGraphExecutor::Run(
#ifdef PADDLE_WITH_CUDA
const std::string gc_name = "garbage_collector";
DeviceGarbageCollectorMap *gc =
Graph().Has(gc_name) ? &(Graph().Get<DeviceGarbageCollectorMap>(gc_name))
: nullptr;
DeviceGarbageCollectorMap *gc = nullptr;
// FIXME(Yancey1989): need to fix gc failed on parallel graph mode
if (strategy_.type_ != ExecutionStrategy::kParallelGraph) {
gc = Graph().Has(gc_name)
? &(Graph().Get<DeviceGarbageCollectorMap>(gc_name))
: nullptr;
}
#endif
if (!fetch_tensors.empty() ||
......
......@@ -38,7 +38,8 @@ class ScopeBufferedSSAGraphExecutor : public SSAGraphExecutor {
public:
ScopeBufferedSSAGraphExecutor(
ExecutionStrategy strategy, std::vector<Scope*> local_scopes,
std::vector<VariableInfo> var_infos, std::vector<platform::Place> places,
std::vector<std::vector<VariableInfo>> var_info_list,
std::vector<platform::Place> places,
std::unique_ptr<SSAGraphExecutor>&& underlying_executor);
const ir::Graph& Graph() const override {
......@@ -53,7 +54,7 @@ class ScopeBufferedSSAGraphExecutor : public SSAGraphExecutor {
ExecutionStrategy strategy_;
std::unique_ptr<SSAGraphExecutor> underlying_executor_;
std::vector<Scope*> local_scopes_;
std::vector<VariableInfo> var_infos_;
std::vector<std::vector<VariableInfo>> var_infos_list_;
std::vector<platform::Place> places_;
};
} // namespace details
......
......@@ -24,6 +24,7 @@
#include <functional>
#include "ThreadPool.h" // ThreadPool in thrird party
#include "paddle/fluid/framework/blocking_queue.h"
#include "paddle/fluid/framework/details/computation_op_handle.h"
#include "paddle/fluid/framework/details/exception_holder.h"
#include "paddle/fluid/framework/details/execution_strategy.h"
#include "paddle/fluid/framework/details/fetch_op_handle.h"
......
......@@ -20,7 +20,7 @@ namespace details {
VarHandleBase::~VarHandleBase() {}
VarHandle::~VarHandle() { VLOG(4) << "deleting var handle " << DebugString(); }
VarHandle::~VarHandle() { VLOG(5) << "deleting var handle " << DebugString(); }
std::string VarHandle::DebugString() const {
std::stringstream ss;
......
......@@ -26,6 +26,7 @@ limitations under the License. */
#include "paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h"
#include "paddle/fluid/framework/details/multi_devices_helper.h"
#include "paddle/fluid/framework/details/parallel_ssa_graph_executor.h"
#include "paddle/fluid/framework/details/scope_buffered_ssa_graph_executor.h"
#include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h"
#include "paddle/fluid/platform/profiler.h"
......@@ -53,6 +54,7 @@ class ParallelExecutorPrivate {
std::vector<Scope *> local_scopes_;
Scope *global_scope_; // not owned
std::unique_ptr<details::SSAGraphExecutor> executor_;
std::vector<std::unique_ptr<details::SSAGraphExecutor>> executors_;
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
std::unique_ptr<platform::NCCLContextMap> nccl_ctxs_;
......@@ -84,6 +86,9 @@ ParallelExecutor::ParallelExecutor(
PADDLE_ENFORCE(places.size() > 1,
"If you set build_strategy.reduce with 'Reduce',"
"the number of places must be greater than 1.");
PADDLE_ENFORCE(exec_strategy.type_ != ExecutionStrategy::kParallelGraph,
"You should set build_strategy.reduce with 'AllReduce' for "
"ParallelGraph executor type");
}
// Step 1. Bcast the params to devs.
......@@ -106,31 +111,55 @@ ParallelExecutor::ParallelExecutor(
// Bcast Parameters to all GPUs
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
auto *nccl_id_var = scope->FindVar(NCCL_ID_VARNAME);
ncclUniqueId *nccl_id = nullptr;
std::unique_ptr<ncclUniqueId> nccl_id = nullptr;
bool need_group_call = true;
if (nccl_id_var != nullptr) {
nccl_id = nccl_id_var->GetMutable<ncclUniqueId>();
nccl_id.reset(nccl_id_var->GetMutable<ncclUniqueId>());
} else if (exec_strategy.type_ == ExecutionStrategy::kParallelGraph) {
nccl_id.reset(new ncclUniqueId());
PADDLE_ENFORCE(platform::dynload::ncclGetUniqueId(nccl_id.get()));
*member_->global_scope_->Var(NCCL_ID_VARNAME)
->GetMutable<ncclUniqueId>() = *nccl_id.get();
need_group_call = false;
} else {
// init nccl_id in NCCLContextMap
}
member_->nccl_ctxs_.reset(new platform::NCCLContextMap(
member_->places_, nccl_id, num_trainers, trainer_id));
member_->places_, nccl_id.get(), num_trainers, trainer_id,
need_group_call));
#else
PADDLE_THROW("Not compiled with CUDA");
#endif
}
if (member_->local_scopes_.size() != 1 && local_scopes.empty()) {
BCastParamsToDevices(bcast_vars);
}
// Startup Program has been run. All local scopes has correct parameters.
// Startup Program has been run. All local scopes has correct parameters.
// Step 2. Convert main_program to SSA form and dependency graph. Also, insert
// ncclOp
// Step 2. Convert main_program to SSA form and dependency graph. Also, insert
// ncclOp
std::vector<std::unique_ptr<ir::Graph>> graphs;
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
std::unique_ptr<ir::Graph> graph = build_strategy.Apply(
main_program, member_->places_, loss_var_name, params,
member_->local_scopes_, member_->use_cuda_, member_->nccl_ctxs_.get());
if (exec_strategy.type_ == ExecutionStrategy::kParallelGraph) {
for (size_t i = 0; i < member_->places_.size(); ++i) {
std::unique_ptr<ir::Graph> graph = build_strategy.Apply(
main_program, {member_->places_[i]}, loss_var_name, params,
{member_->local_scopes_[i]}, member_->use_cuda_,
member_->nccl_ctxs_.get());
graphs.push_back(std::move(graph));
}
} else {
std::unique_ptr<ir::Graph> graph = build_strategy.Apply(
main_program, member_->places_, loss_var_name, params,
member_->local_scopes_, member_->use_cuda_, member_->nccl_ctxs_.get());
graphs.push_back(std::move(graph));
}
auto max_memory_size = GetEagerDeletionThreshold();
if (max_memory_size >= 0) {
// FIXME(Yancey1989): need to fix on parallel graph mode
if (max_memory_size >= 0 &&
exec_strategy.type_ != ExecutionStrategy::kParallelGraph) {
for (auto &place : member_->places_) {
if (!platform::is_gpu_place(place)) continue;
auto gpu_place = boost::get<platform::CUDAPlace>(place);
......@@ -143,40 +172,48 @@ ParallelExecutor::ParallelExecutor(
}
}
if (!gcs_.empty()) {
auto ref_cnt_pass =
ir::PassRegistry::Instance().Get("reference_count_pass");
ref_cnt_pass->SetNotOwned(details::kGlobalReferenceCount, &ref_cnts_);
ref_cnt_pass->SetNotOwned(details::kCurReferenceCount, &cur_ref_cnts_);
ref_cnt_pass->SetNotOwned(details::kGarbageCollector, &gcs_);
graph = ref_cnt_pass->Apply(std::move(graph));
graph->SetNotOwned("garbage_collector", &gcs_);
for (size_t i = 0; i < graphs.size(); ++i) {
auto ref_cnt_pass =
ir::PassRegistry::Instance().Get("reference_count_pass");
ref_cnt_pass->SetNotOwned(details::kGlobalReferenceCount, &ref_cnts_);
ref_cnt_pass->SetNotOwned(details::kCurReferenceCount, &cur_ref_cnts_);
ref_cnt_pass->SetNotOwned(details::kGarbageCollector, &gcs_);
graphs[0] = ref_cnt_pass->Apply(std::move(graphs[i]));
graphs[0]->SetNotOwned("garbage_collector", &gcs_);
}
}
}
#else
std::unique_ptr<ir::Graph> graph =
build_strategy.Apply(main_program, member_->places_, loss_var_name,
params, member_->local_scopes_, member_->use_cuda_);
graphs.push_back(std::move(graph));
#endif
// Step 3. Create vars in each scope. Passes may also create new vars.
// skip control vars and empty vars
std::vector<details::VariableInfo> var_infos;
for (auto &node : graph->Nodes()) {
if (node->IsVar() && !node->IsCtrlVar() && node->Var()) {
var_infos.emplace_back();
var_infos.back().name_ = node->Var()->Name();
var_infos.back().type_ = node->Var()->GetType();
var_infos.back().persistable_ = node->Var()->Persistable();
std::vector<std::vector<details::VariableInfo>> var_infos_list;
for (size_t i = 0; i < graphs.size(); ++i) {
std::vector<details::VariableInfo> var_infos;
for (auto &node : graphs[i]->Nodes()) {
if (node->IsVar() && !node->IsCtrlVar() && node->Var()) {
var_infos.emplace_back();
var_infos.back().name_ = node->Var()->Name();
var_infos.back().type_ = node->Var()->GetType();
var_infos.back().persistable_ = node->Var()->Persistable();
}
}
var_infos_list.emplace_back(std::move(var_infos));
}
// If the loss_var_name is given, the number of graph should be only one.
if (loss_var_name.size()) {
size_t graph_num = ir::GraphNum(*graph);
size_t graph_num = ir::GraphNum(*graphs[0]);
if (graph_num > 1) {
LOG(WARNING)
<< "The number of graph should be only one, "
"but the current graph has "
<< ir::GraphNum(*graph)
<< ir::GraphNum(*graphs[0])
<< " sub_graphs. If you want to see the nodes of the "
"sub_graphs, you should use 'FLAGS_print_sub_graph_dir' "
"to specify the output dir. NOTES: if you not do training, "
......@@ -185,15 +222,42 @@ ParallelExecutor::ParallelExecutor(
}
if (exec_strategy.type_ == ExecutionStrategy::kDefault) {
/**
for (size_t i = 0; i < member_->places_.size(); ++i) {
std::vector<details::VariableInfo> var_infos;
for (auto &node : graphs[i]->Nodes()) {
if (node->IsVar() && !node->IsCtrlVar() && node->Var()) {
var_infos.emplace_back();
var_infos.back().name_ = node->Var()->Name();
var_infos.back().type_ = node->Var()->GetType();
var_infos.back().persistable_ = node->Var()->Persistable();
}
}
std::vector<platform::Place> places = {member_->places_[i]};
std::vector<framework::Scope *> scopes = {member_->local_scopes_[i]};
std::unique_ptr<details::ThreadedSSAGraphExecutor> p(new
details::ThreadedSSAGraphExecutor(
exec_strategy, scopes, places, std::move(graphs[i])));
member_->executors_.push_back(std::move(p));
member_->executors_[i].reset(new details::ScopeBufferedSSAGraphExecutor(
exec_strategy, scopes, std::move(var_infos), places,
std::move(member_->executors_[i])));
}**/
member_->executor_.reset(new details::ThreadedSSAGraphExecutor(
exec_strategy, member_->local_scopes_, places, std::move(graph)));
exec_strategy, member_->local_scopes_, places, std::move(graphs[0])));
} else if (exec_strategy.type_ == ExecutionStrategy::kParallelGraph) {
member_->executor_.reset(new details::ParallelSSAGraphExecutor(
exec_strategy, member_->local_scopes_, places, graphs));
} else {
member_->executor_.reset(new details::FastThreadedSSAGraphExecutor(
exec_strategy, member_->local_scopes_, places, std::move(graph)));
exec_strategy, member_->local_scopes_, places, std::move(graphs[0])));
}
member_->executor_.reset(new details::ScopeBufferedSSAGraphExecutor(
exec_strategy, member_->local_scopes_, std::move(var_infos),
exec_strategy, member_->local_scopes_, std::move(var_infos_list),
member_->places_, std::move(member_->executor_)));
}
......
......@@ -20,6 +20,8 @@ limitations under the License. */
#include <unordered_set>
#include <vector>
#include "ThreadPool.h"
#include "paddle/fluid/framework/details/build_strategy.h"
#include "paddle/fluid/framework/details/execution_strategy.h"
#include "paddle/fluid/framework/executor.h"
......
......@@ -58,7 +58,10 @@ int64_t GetEagerDeletionThreshold() {
(static_cast<int64_t>(1) << 30));
}
Scope::~Scope() { DropKids(); }
Scope::~Scope() {
VLOG(5) << "~Scope()";
DropKids();
}
Scope& Scope::NewScope() const {
SCOPE_LOCK_GUARD
......
......@@ -48,9 +48,18 @@ void ThreadPool::Init() {
ThreadPool::ThreadPool(int num_threads) : running_(true) {
threads_.resize(num_threads);
for (auto& thread : threads_) {
for (int i = 0; i < num_threads; ++i) {
// for (auto& thread : threads_) {
// TODO(Yancey1989): binding the thread on the specify CPU number
thread.reset(new std::thread(std::bind(&ThreadPool::TaskLoop, this)));
threads_[i].reset(
new std::thread(std::bind(&ThreadPool::TaskLoop, this, i)));
/**
sched_param sch;
int policy;
pthread_getschedparam(threads_[i]->native_handle(), &policy, &sch);
if (pthread_setschedparam(threads_[i]->native_handle(), SCHED_FIFO, &sch)) {
VLOG(1) << "Failed to setschedparam: " << errno;
}**/
}
}
......@@ -68,7 +77,7 @@ ThreadPool::~ThreadPool() {
}
}
void ThreadPool::TaskLoop() {
void ThreadPool::TaskLoop(int i) {
while (true) {
Task task;
......@@ -89,7 +98,6 @@ void ThreadPool::TaskLoop() {
task = std::move(tasks_.front());
tasks_.pop();
}
// run the task
task();
}
......
......@@ -14,6 +14,7 @@ limitations under the License. */
#pragma once
#include <pthread.h>
#include <condition_variable> // NOLINT
#include <functional>
#include <future> // NOLINT
......@@ -27,7 +28,6 @@ limitations under the License. */
namespace paddle {
namespace framework {
struct ExceptionHandler {
mutable std::future<std::unique_ptr<platform::EnforceNotMet>> future_;
explicit ExceptionHandler(
......@@ -99,7 +99,7 @@ class ThreadPool {
// The constructor starts threads to run TaskLoop, which retrieves
// and runs tasks from the queue.
void TaskLoop();
void TaskLoop(int i);
// Init is called by GetInstance.
static void Init();
......
......@@ -59,3 +59,47 @@ TEST(ThreadPool, ConcurrentRun) {
}
EXPECT_EQ(sum, ((n + 1) * n) / 2);
}
static int64_t GetTS() {
struct timeval tp;
gettimeofday(&tp, NULL);
return tp.tv_sec * 1000000 + tp.tv_usec;
}
void multi_call(std::function<void()> call) {
for (int i = 0; i < 500; ++i) {
call();
}
}
TEST(ThreadPool, PERFORMANCE) {
auto sum = [] {
int a = 0;
for (int i = 0; i < 1000; ++i) {
a += i;
}
};
// framework::ThreadPool *pool = new framework::ThreadPool(2);
int64_t start = GetTS();
for (int i = 0; i < 1000; ++i) {
// int64_t s = GetTS();
framework::Async(std::move(sum));
// pool->Run(std::move(sum));
// VLOG(5) << "push to pool spent : " << GetTS() - s << " (us).";
}
VLOG(5) << "pool spent: " << GetTS() - start << " (us).";
start = GetTS();
for (int i = 0; i < 1000; ++i) {
sum();
}
VLOG(5) << "sequence call spent: " << GetTS() - start << " (us).";
std::vector<std::thread> threads;
start = GetTS();
for (int i = 0; i < 2; ++i) {
std::thread t(multi_call, std::ref(sum));
threads.push_back(std::move(t));
}
for (auto& thread : threads) {
thread.join();
}
VLOG(5) << "two threads spent: " << GetTS() - start << " (us).";
}
......@@ -67,9 +67,12 @@ class BlockingQueue {
}
bool Receive(T* elem) {
VLOG(1) << "blocking queue::Receive ...";
std::unique_lock<std::mutex> lock(mutex_);
receive_cv_.wait(lock, [&] { return !queue_.empty() || closed_; });
VLOG(1) << "queue_.empty()=" << queue_.empty();
if (!queue_.empty()) {
if (elem == nullptr) VLOG(1) << "elem is nullptr";
PADDLE_ENFORCE_NOT_NULL(elem);
*elem = queue_.front();
if (LIKELY(!speed_test_mode_)) {
......
......@@ -58,7 +58,9 @@ void BufferedReader::ReadAsync(size_t i) {
TensorVec &gpu = gpu_buffer_[i];
gpu.resize(cpu.size());
for (size_t i = 0; i < cpu.size(); ++i) {
VLOG(1) << "launch tensor copy from cpu to cpu, idx: " << i;
framework::TensorCopySync(cpu[i], place_, &gpu[i]);
VLOG(1) << "done " << i;
gpu[i].set_lod(cpu[i].lod());
}
}
......@@ -80,11 +82,13 @@ void BufferedReader::StartImpl() {
}
void BufferedReader::ReadNextImpl(std::vector<framework::LoDTensor> *out) {
VLOG(1) << "ReadNextImpl start on place: " << place_;
if (position_.empty()) {
out->clear();
return;
}
size_t i = position_.front().get();
VLOG(1) << "position front: " << i;
position_.pop();
if (i == -1UL) {
......@@ -101,6 +105,7 @@ void BufferedReader::ReadNextImpl(std::vector<framework::LoDTensor> *out) {
ReadAsync(prev_pos_);
}
prev_pos_ = i;
VLOG(1) << "success ReadNextImpl";
}
} // namespace reader
......
......@@ -25,9 +25,15 @@ class CreateDoubleBufferReaderOp : public framework::OperatorBase {
private:
void RunImpl(const framework::Scope& scope,
const platform::Place& dev_place) const override {
auto* out = scope.FindVar(Output("Out"))
->template GetMutable<framework::ReaderHolder>();
VLOG(1) << "find var in scope: " << &scope;
auto* out_var = scope.FindVar(Output("Out"));
VLOG(1) << "var " << Output("Out") << " -> " << out_var;
auto* out = out_var->GetMutable<framework::ReaderHolder>();
// auto* out = scope.Var(Output("Out"))
// ->template GetMutable<framework::ReaderHolder>();
if (out->Get() != nullptr) {
VLOG(1) << Output("Out") << " is not nullptr.";
return;
}
const auto& underlying_reader = scope.FindVar(Input("UnderlyingReader"))
......@@ -46,9 +52,11 @@ class CreateDoubleBufferReaderOp : public framework::OperatorBase {
sin >> num;
place = platform::CUDAPlace(static_cast<int>(num));
}
VLOG(1) << "create buffered reader on " << place;
out->Reset(framework::MakeDecoratedReader<BufferedReader>(underlying_reader,
place, 2));
VLOG(1) << "Reset Buffered Reader in var: "
<< scope.FindVar(Input("UnderlyingReader"));
}
};
......
......@@ -28,8 +28,10 @@ class PyReader : public framework::FileReader {
}
void ReadNext(std::vector<framework::LoDTensor>* out) override {
VLOG(1) << "come in PyReader::ReadNext function, out: " << out;
bool success;
*out = queue_->Pop(&success);
VLOG(1) << "call PyReader::ReadNext " << success;
if (!success) out->clear();
}
......
......@@ -115,10 +115,12 @@ class PreemptiveReaderContainer : public IReaderContainer {
}
void ReadNext(std::vector<framework::LoDTensor>* out) override {
VLOG(1) << "flag";
if (!pending_.empty()) {
auto future_it = complete_queue_.Pop();
FutureItem item = future_it->get();
if (item.exception_) {
VLOG(1) << "item has exception!!!";
for (auto it = futures_.begin(); it != futures_.end(); ++it) {
if (it != future_it) {
it->wait(); // Wait all other threads complete.
......
......@@ -82,12 +82,15 @@ struct NCCLContext {
struct NCCLContextMap {
std::unordered_map<int, NCCLContext> contexts_;
std::vector<int> order_;
bool need_group_call_;
explicit NCCLContextMap(const std::vector<platform::Place> &places,
ncclUniqueId *nccl_id = nullptr,
size_t num_trainers = 1, size_t trainer_id = 0) {
size_t num_trainers = 1, size_t trainer_id = 0,
bool need_group_call = true) {
PADDLE_ENFORCE(!places.empty());
order_.reserve(places.size());
need_group_call_ = need_group_call;
for (auto &p : places) {
int dev_id = boost::get<CUDAPlace>(p).device;
order_.emplace_back(dev_id);
......@@ -102,7 +105,7 @@ struct NCCLContextMap {
}
std::unique_ptr<ncclComm_t[]> comms(new ncclComm_t[order_.size()]);
// if num_trainers == 1, should create a new nccl id for local comms.
if (num_trainers == 1) {
if (num_trainers == 1 && nccl_id != nullptr) {
std::lock_guard<std::mutex> guard(NCCLGroupGuard::NCCLMutex());
PADDLE_ENFORCE(platform::dynload::ncclCommInitAll(
comms.get(), static_cast<int>(order_.size()), order_.data()));
......
......@@ -12,9 +12,6 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/platform/port.h"
#include <algorithm>
#include <iomanip>
#include <limits>
......@@ -25,9 +22,12 @@ limitations under the License. */
#ifdef PADDLE_WITH_CUDA
#include <cuda.h>
#endif // PADDLE_WITH_CUDA
#include "glog/logging.h"
#include "paddle/fluid/framework/block_desc.h"
#include "paddle/fluid/platform/device_tracer.h"
#include "paddle/fluid/platform/port.h"
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/string/printf.h"
DEFINE_bool(enable_rpc_profiler, false, "Enable rpc profiler or not.");
......@@ -173,8 +173,9 @@ void PopEvent(const std::string& name, const DeviceContext* dev_ctx) {
RecordEvent::RecordEvent(const std::string& name, const DeviceContext* dev_ctx)
: is_enabled_(false), start_ns_(PosixInNsec()) {
std::lock_guard<std::mutex> l(profiler_mu);
if (g_state == ProfilerState::kDisabled) return;
std::lock_guard<std::mutex> l(profiler_mu);
is_enabled_ = true;
dev_ctx_ = dev_ctx;
name_ = name;
......@@ -184,8 +185,9 @@ RecordEvent::RecordEvent(const std::string& name, const DeviceContext* dev_ctx)
}
RecordEvent::~RecordEvent() {
std::lock_guard<std::mutex> l(profiler_mu);
if (g_state == ProfilerState::kDisabled || !is_enabled_) return;
VLOG(5) << "call ~RecordEvent";
std::lock_guard<std::mutex> l(profiler_mu);
DeviceTracer* tracer = GetDeviceTracer();
if (tracer) {
tracer->AddCPURecords(CurAnnotation(), start_ns_, PosixInNsec(),
......
......@@ -720,6 +720,11 @@ All parameter, weight, gradient are variables in Paddle.
)DOC");
py::enum_<ExecutionStrategy::ExecutorType>(exec_strategy, "ExecutorType")
.value("Default", ExecutionStrategy::ExecutorType::kDefault)
.value("Experimental", ExecutionStrategy::ExecutorType::kExperimental)
.value("ParallelGraph", ExecutionStrategy::ExecutorType::kParallelGraph);
exec_strategy.def(py::init())
.def_property(
"num_threads",
......@@ -777,17 +782,14 @@ All parameter, weight, gradient are variables in Paddle.
[](const ExecutionStrategy &self) { return self.dry_run_; },
[](ExecutionStrategy &self, bool dry_run) {
self.dry_run_ = dry_run;
});
exec_strategy.def_property(
"use_experimental_executor",
[](const ExecutionStrategy &self) {
return self.type_ == ExecutionStrategy::kExperimental;
},
[](ExecutionStrategy &self, bool experimental) {
self.type_ = experimental ? ExecutionStrategy::kExperimental
: ExecutionStrategy::kDefault;
});
})
.def_property(
"executor_type",
[](const ExecutionStrategy &self) { return self.type_; },
[](ExecutionStrategy &self, ExecutionStrategy::ExecutorType type) {
self.type_ = type;
},
R"DOC()DOC");
py::class_<BuildStrategy> build_strategy(pe, "BuildStrategy", R"DOC(
BuildStrategy allows the user to more preciously control how to
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册