提交 31a05d3e 编写于 作者: Q Qiao Longfei

Merge branch 'develop' of https://github.com/PaddlePaddle/Paddle into add-async-ssa-graph-executor

test=develop
......@@ -427,7 +427,7 @@ paddle.fluid.optimizer.MomentumOptimizer.__init__ ArgSpec(args=['self', 'learnin
paddle.fluid.optimizer.MomentumOptimizer.apply_gradients ArgSpec(args=['self', 'params_grads'], varargs=None, keywords=None, defaults=None)
paddle.fluid.optimizer.MomentumOptimizer.backward ArgSpec(args=['self', 'loss', 'startup_program', 'parameter_list', 'no_grad_set', 'callbacks'], varargs=None, keywords=None, defaults=(None, None, None, None))
paddle.fluid.optimizer.MomentumOptimizer.minimize ArgSpec(args=['self', 'loss', 'startup_program', 'parameter_list', 'no_grad_set'], varargs=None, keywords=None, defaults=(None, None, None))
paddle.fluid.optimizer.AdagradOptimizer.__init__ ArgSpec(args=['self', 'learning_rate', 'epsilon', 'regularization', 'name'], varargs=None, keywords=None, defaults=(1e-06, None, None))
paddle.fluid.optimizer.AdagradOptimizer.__init__ ArgSpec(args=['self', 'learning_rate', 'epsilon', 'regularization', 'name', 'initial_accumulator_value'], varargs=None, keywords=None, defaults=(1e-06, None, None, 0.0))
paddle.fluid.optimizer.AdagradOptimizer.apply_gradients ArgSpec(args=['self', 'params_grads'], varargs=None, keywords=None, defaults=None)
paddle.fluid.optimizer.AdagradOptimizer.backward ArgSpec(args=['self', 'loss', 'startup_program', 'parameter_list', 'no_grad_set', 'callbacks'], varargs=None, keywords=None, defaults=(None, None, None, None))
paddle.fluid.optimizer.AdagradOptimizer.minimize ArgSpec(args=['self', 'loss', 'startup_program', 'parameter_list', 'no_grad_set'], varargs=None, keywords=None, defaults=(None, None, None))
......
......@@ -30,8 +30,6 @@ namespace paddle {
namespace framework {
namespace details {
static constexpr char kAllOpDescs[] = "all_op_descs";
VarHandle* GetValidInput(const OpHandleBase* a) {
for (auto p : a->Inputs()) {
VarHandle* b = dynamic_cast<VarHandle*>(p);
......
......@@ -53,7 +53,7 @@ AllReduceOpHandle::AllReduceOpHandle(ir::Node *node,
#endif
void AllReduceOpHandle::RunImpl() {
platform::RecordEvent record_event(Name(), dev_ctxes_.cbegin()->second);
platform::RecordEvent record_event(Name());
WaitInputVarGenerated();
auto in_var_handles = DynamicCast<VarHandle>(this->Inputs());
......
......@@ -22,7 +22,7 @@ namespace framework {
namespace details {
void BroadcastOpHandle::RunImpl() {
platform::RecordEvent record_event(Name(), dev_ctxes_.begin()->second);
platform::RecordEvent record_event(Name());
if (places_.size() == 1) return;
......
......@@ -34,9 +34,11 @@ namespace details {
static inline bool SeqOnlyAllReduceOps(const BuildStrategy &strategy) {
// Should fix the allreduce op order if scheduling
// them in multiple threads or processes to avoid hang.
// NOTE: ParallelGraph would execute this pass on each graph, so
// don't need to append it here.
return (!strategy.enable_sequential_execution_ &&
strategy.num_trainers_ > 1) ||
strategy.enable_parallel_graph_;
strategy.num_trainers_ > 1) &&
!strategy.enable_parallel_graph_;
}
class ParallelExecutorPassBuilder : public ir::PassBuilder {
......
......@@ -22,7 +22,7 @@ namespace framework {
namespace details {
void FusedBroadcastOpHandle::RunImpl() {
platform::RecordEvent record_event(Name(), dev_ctxes_.begin()->second);
platform::RecordEvent record_event(Name());
if (places_.size() == 1UL) return;
......
......@@ -29,8 +29,6 @@ namespace paddle {
namespace framework {
namespace details {
constexpr char kAllOpDescs[] = "all_op_descs";
std::vector<ir::Node*> SortOpLikeDescOrder(const ir::Graph& graph);
// NOTE(dzh): A ordered set for node reuse in memory optimize.
......
......@@ -412,20 +412,32 @@ void MultiDevSSAGraphBuilderBase::CreateComputationalOp(ir::Graph *result,
void MultiDevSSAGraphBuilderBase::CreateAllReduceOp(
ir::Graph *result, const std::string &og) const {
OpHandleBase *op_handle = nullptr;
auto append_allreduce_op = [&](
const std::vector<Scope *> &scopes,
const std::vector<platform::Place> &places) -> OpHandleBase * {
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
result->Get<GraphOps>(kGraphOps).emplace_back(new AllReduceOpHandle(
result->CreateEmptyNode("allreduce", ir::Node::Type::kOperation),
local_scopes_, places_, nccl_ctxs_));
scopes, places, nccl_ctxs_));
#else
result->Get<GraphOps>(kGraphOps).emplace_back(new AllReduceOpHandle(
result->CreateEmptyNode("allreduce", ir::Node::Type::kOperation),
local_scopes_, places_));
scopes, places));
#endif
auto *op_handle = result->Get<GraphOps>(kGraphOps).back();
return result->Get<GraphOps>(kGraphOps).back();
};
if (!strategy_.enable_parallel_graph_)
op_handle = append_allreduce_op(local_scopes_, places_);
for (size_t i = 0; i < places_.size(); ++i) {
auto &p = places_[i];
SetCommunicationContext(op_handle, p);
if (strategy_.enable_parallel_graph_) {
op_handle = append_allreduce_op({local_scopes_[i]}, {places_[i]});
}
SetCommunicationContext(op_handle, places_[i]);
auto &vars = result->Get<GraphVars>(kGraphVars)[i][og];
PADDLE_ENFORCE(!vars.empty());
auto &prev_grad = vars.back();
......@@ -433,7 +445,7 @@ void MultiDevSSAGraphBuilderBase::CreateAllReduceOp(
auto var =
new VarHandle(result->CreateEmptyNode(og, ir::Node::Type::kVariable),
vars.size(), i, og, p);
vars.size(), i, og, places_[i]);
vars.emplace_back(var);
op_handle->AddOutput(var);
}
......
......@@ -36,13 +36,14 @@ namespace details {
// map from variable name to variables. The variables, who have the same name,
// will have a differsent version. The offset in the
// `std::vector<VarHandle*>` is the version of varaibles.
typedef std::vector<std::unordered_map<std::string, std::vector<VarHandle*>>>
typedef std::vector<std::unordered_map<std::string, std::vector<VarHandle *>>>
GraphVars;
const char kGraphVars[] = "vars";
// aux variables to represent dependency. Useful to resolve data hazard.
typedef std::unordered_set<VarHandleBase*> GraphDepVars;
typedef std::unordered_set<VarHandleBase *> GraphDepVars;
const char kGraphDepVars[] = "dep_vars";
} // namespace details
} // namespace framework
} // namespace paddle
......@@ -70,6 +70,9 @@ class OpHandleBase {
auto it = dev_ctxes_.find(place);
return it != dev_ctxes_.end() ? it->second : nullptr;
}
const std::map<platform::Place, platform::DeviceContext *> &DeviceContext() {
return dev_ctxes_;
}
void SetDeviceContext(platform::Place place, platform::DeviceContext *ctx_) {
dev_ctxes_[place] = ctx_;
......
......@@ -13,22 +13,92 @@
// limitations under the License.
#include "paddle/fluid/framework/details/parallel_ssa_graph_executor.h"
#include "paddle/fluid/framework/ir/graph_helper.h"
namespace paddle {
namespace framework {
namespace details {
std::vector<std::unique_ptr<ir::Graph>>
ParallelSSAGraphExecutor::SeparateMultiDevicesGraph(
std::unique_ptr<ir::Graph> &&graph) {
std::vector<std::unique_ptr<ir::Graph>> graphs;
graphs.reserve(places_.size());
for (size_t i = 0; i < places_.size(); ++i) {
ProgramDesc empty;
graphs.emplace_back(std::unique_ptr<ir::Graph>(new ir::Graph(empty)));
auto &g = graphs.back();
g->Set(kGraphVars, new GraphVars(1UL));
g->Set(kGraphDepVars, new GraphDepVars);
}
auto op_handles = ir::FilterByNodeWrapper<OpHandleBase>(*graph);
for (auto &op : op_handles) {
auto &dev_ctx = op->DeviceContext();
auto &p = dev_ctx.begin()->first;
int dev_id = boost::get<platform::CUDAPlace>(p).device;
auto &dev_dummys = graphs[dev_id]->Get<GraphDepVars>(kGraphDepVars);
graphs[dev_id]->AddNode(graph->RemoveNode(op->Node()).release());
for (auto &var : op->Inputs()) {
auto dummy_ptr = dynamic_cast<DummyVarHandle *>(var);
if (dummy_ptr) {
dev_dummys.insert(var);
if (graph->Nodes().count(var->Node()))
graphs[dev_id]->AddNode(graph->RemoveNode(var->Node()).release());
}
}
for (auto &var : op->Outputs()) {
auto dummy_ptr = dynamic_cast<DummyVarHandle *>(var);
if (dummy_ptr) {
dev_dummys.insert(var);
if (graph->Nodes().count(var->Node()))
graphs[dev_id]->AddNode(graph->RemoveNode(var->Node()).release());
}
}
}
for (size_t dev_id = 0; dev_id < places_.size(); ++dev_id) {
auto &dev_vars = graphs[dev_id]->Get<GraphVars>(kGraphVars)[0];
auto &origin_vars = graph->Get<GraphVars>(kGraphVars)[dev_id];
for (auto &name_pair : origin_vars) {
dev_vars.emplace(name_pair.first, name_pair.second);
for (auto &version_pair : name_pair.second) {
if (graph->Nodes().count(version_pair->Node())) {
graphs[dev_id]->AddNode(
graph->RemoveNode(version_pair->Node()).release());
}
}
}
}
return graphs;
}
ParallelSSAGraphExecutor::ParallelSSAGraphExecutor(
const ExecutionStrategy &strategy, const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
std::vector<std::unique_ptr<ir::Graph>> &&graphs)
const framework::ProgramDesc &main_prog, std::unique_ptr<ir::Graph> &&graph)
: strategy_(std::move(strategy)),
local_scopes_(std::move(local_scopes)),
pool_(places.size() >= 2 ? new ::ThreadPool(places.size()) : nullptr),
places_(std::move(places)),
graphs_(std::move(graphs)) {
main_prog_(main_prog),
// TODO(Yancey1989): Copying graphs is not safely since it deleted the
// attrs.
graphs_(SeparateMultiDevicesGraph(std::move(graph))) {
PADDLE_ENFORCE_EQ(places_.size(), local_scopes_.size());
auto seq_allreduce_pass =
ir::PassRegistry::Instance().Get("all_reduce_deps_pass");
seq_allreduce_pass->Erase(details::kAllOpDescs);
seq_allreduce_pass->Set<const std::vector<OpDesc *>>(
details::kAllOpDescs,
new std::vector<OpDesc *>(main_prog_.Block(0).AllOps()));
for (size_t i = 0; i < graphs_.size(); ++i) {
graphs_[i] = seq_allreduce_pass->Apply(std::move(graphs_[i]));
}
// set the correct size of thread pool to each device.
strategy_.num_threads_ = strategy_.num_threads_ < places_.size()
? 1UL
......@@ -37,7 +107,7 @@ ParallelSSAGraphExecutor::ParallelSSAGraphExecutor(
<< " to run the operators of the graph on each device.";
for (size_t i = 0; i < places.size(); ++i) {
executors_.emplace_back(new details::ThreadedSSAGraphExecutor(
strategy_, {local_scopes_[i]}, {places_[i]}, std::move(graphs_[i])));
strategy_, local_scopes_, {places_[i]}, std::move(graphs_.at(i))));
}
}
......
......@@ -18,7 +18,9 @@
#include <vector>
#include "ThreadPool.h"
#include "paddle/fluid/framework/details/multi_devices_helper.h"
#include "paddle/fluid/framework/details/threaded_ssa_graph_executor.h"
#include "paddle/fluid/framework/ir/graph.h"
namespace paddle {
namespace framework {
......@@ -29,17 +31,23 @@ class ParallelSSAGraphExecutor : public SSAGraphExecutor {
ParallelSSAGraphExecutor(const ExecutionStrategy &strategy,
const std::vector<Scope *> &local_scopes,
const std::vector<platform::Place> &places,
std::vector<std::unique_ptr<ir::Graph>> &&graphs);
const framework::ProgramDesc &main_prog,
std::unique_ptr<ir::Graph> &&graph);
~ParallelSSAGraphExecutor() final = default;
const ir::Graph &Graph() const override { return *graphs_[0]; }
FeedFetchList Run(const std::vector<std::string> &fetch_tensors) override;
private:
std::vector<std::unique_ptr<ir::Graph>> SeparateMultiDevicesGraph(
std::unique_ptr<ir::Graph> &&graph);
ExecutionStrategy strategy_;
std::vector<Scope *> local_scopes_;
std::unique_ptr<::ThreadPool> pool_{nullptr};
std::vector<platform::Place> places_;
framework::ProgramDesc main_prog_;
std::vector<std::unique_ptr<ir::Graph>> graphs_;
std::vector<std::unique_ptr<details::ThreadedSSAGraphExecutor>> executors_;
......
......@@ -139,7 +139,7 @@ void ReduceOpHandle::GatherSelectedRows(
#endif
void ReduceOpHandle::RunImpl() {
platform::RecordEvent record_event(Name(), dev_ctxes_.cbegin()->second);
platform::RecordEvent record_event(Name());
if (places_.size() == 1) return;
// the input and output may have dummy var.
......
......@@ -63,7 +63,7 @@ FeedFetchList ScopeBufferedSSAGraphExecutor::Run(
eptr = std::current_exception();
}
platform::RecordEvent e("ScopeBufferedSSAGraphExecutorAfterRun", nullptr);
platform::RecordEvent e("ScopeBufferedSSAGraphExecutorAfterRun");
++drop_scope_counter_;
bool stream_end = false;
......
......@@ -50,7 +50,7 @@ ThreadedSSAGraphExecutor::ThreadedSSAGraphExecutor(
inline FeedFetchList ThreadedSSAGraphExecutor::RunImpl(
const std::vector<std::string> &fetch_tensors) {
std::unique_ptr<platform::RecordEvent> event(
new platform::RecordEvent("ThreadedSSAGraphExecutorPrepare", nullptr));
new platform::RecordEvent("ThreadedSSAGraphExecutorPrepare"));
std::unordered_map<OpHandleBase *, size_t> pending_ops;
std::unordered_set<VarHandleBase *> pending_vars;
auto ready_vars = std::make_shared<BlockingQueue<VarHandleBase *>>();
......@@ -240,7 +240,7 @@ void ThreadedSSAGraphExecutor::RunOp(
VLOG(10) << op << " " << op->Name() << " Done ";
running_ops_--;
ready_var_q->Extend(op->Outputs());
VLOG(10) << op << " " << op->Name() << "Signal posted";
VLOG(10) << op << " " << op->Name() << " Signal posted";
} catch (...) {
exception_holder_.Catch(std::current_exception());
}
......
......@@ -26,6 +26,14 @@ limitations under the License. */
namespace paddle {
namespace framework {
namespace details {
// This attr is not recommended, because the graph should not dependence
// the program once it is built.
constexpr char kAllOpDescs[] = "all_op_descs";
} // namespace details
namespace ir {
/*
......@@ -168,10 +176,13 @@ class Graph {
return ret;
}
void RemoveNode(ir::Node *node) {
std::unique_ptr<ir::Node> RemoveNode(ir::Node *node) {
PADDLE_ENFORCE(node_set_.find(node) != node_set_.end());
node_set_.erase(node);
std::unique_ptr<ir::Node> ret;
ret.reset(nodes_.at(node).release());
nodes_.erase(node);
node_set_.erase(node);
return ret;
}
// NOTE low performance, but simple and secure.
......@@ -184,13 +195,6 @@ class Graph {
return nullptr;
}
void ResolveHazard(
const std::map<std::string, std::vector<ir::Node *>> &var_nodes);
private:
std::map<std::string, std::vector<ir::Node *>> InitFromProgram(
const ProgramDesc &program);
// This method takes ownership of `node`.
ir::Node *AddNode(ir::Node *node) {
PADDLE_ENFORCE(node_set_.find(node) == node_set_.end());
......@@ -199,6 +203,13 @@ class Graph {
return node;
}
void ResolveHazard(
const std::map<std::string, std::vector<ir::Node *>> &var_nodes);
private:
std::map<std::string, std::vector<ir::Node *>> InitFromProgram(
const ProgramDesc &program);
// NOTE: program_ shouldn't be exposed to user.
const ProgramDesc program_;
std::map<std::string, boost::any> attrs_;
......
......@@ -177,9 +177,7 @@ void OperatorBase::Run(const Scope& scope, const platform::Place& place) {
// in concurrency scenerio. Here use an `if` to fix this issue.
// Please not remove the `if`, ask @Superjomn if there are any concern.
if (platform::IsProfileEnabled()) {
platform::DeviceContextPool& pool =
platform::DeviceContextPool::Instance();
platform::RecordEvent record_event(Type(), pool.Get(place));
platform::RecordEvent record_event(Type());
RunImpl(scope, place);
} else {
RunImpl(scope, place);
......
......@@ -22,6 +22,7 @@ limitations under the License. */
#include "paddle/fluid/framework/ir/graph.h"
#include "paddle/fluid/framework/details/async_ssa_graph_executor.h"
#include "paddle/fluid/framework/details/all_reduce_deps_pass.h"
#include "paddle/fluid/framework/details/fast_threaded_ssa_graph_executor.h"
#include "paddle/fluid/framework/details/multi_devices_helper.h"
#include "paddle/fluid/framework/details/parallel_ssa_graph_executor.h"
......@@ -194,7 +195,6 @@ ParallelExecutor::ParallelExecutor(
member_->use_all_reduce_ =
build_strategy.reduce_ == BuildStrategy::ReduceStrategy::kAllReduce;
member_->nranks_ = build_strategy.num_trainers_ * places.size();
if (!member_->use_all_reduce_) {
PADDLE_ENFORCE(places.size() > 1,
"If you set build_strategy.reduce with 'Reduce',"
......@@ -222,9 +222,10 @@ ParallelExecutor::ParallelExecutor(
// choice the execution strategy.
build_strategy.enable_parallel_graph_ =
EnableParallelGraphExecution(main_program, exec_strategy, build_strategy);
VLOG(1) << "Enable ParallelGraph Execution: "
<< build_strategy.enable_parallel_graph_;
if (build_strategy.enable_parallel_graph_)
VLOG(0) << "The Executor would execute the graph by ParallelGraph "
"Execution which can get better performance,"
<< "you can force it off by env FLAGS_enable_parallel_graph=0";
if (member_->use_cuda_) {
// Bcast Parameters to all GPUs
......@@ -258,22 +259,11 @@ ParallelExecutor::ParallelExecutor(
// Step 2. Convert main_program to SSA form and dependency graph. Also, insert
// ncclOp
std::vector<std::unique_ptr<ir::Graph>> graphs;
std::unique_ptr<ir::Graph> graph;
#if defined(PADDLE_WITH_CUDA) && !defined(_WIN32)
if (build_strategy.enable_parallel_graph_) {
for (size_t i = 0; i < member_->places_.size(); ++i) {
std::unique_ptr<ir::Graph> graph = build_strategy.Apply(
main_program, {member_->places_[i]}, loss_var_name,
{member_->local_scopes_[i]}, member_->nranks_, member_->use_cuda_,
member_->nccl_ctxs_.get());
graphs.push_back(std::move(graph));
}
} else {
std::unique_ptr<ir::Graph> graph = build_strategy.Apply(
main_program, member_->places_, loss_var_name, member_->local_scopes_,
member_->nranks_, member_->use_cuda_, member_->nccl_ctxs_.get());
graphs.push_back(std::move(graph));
}
graph = build_strategy.Apply(main_program, member_->places_, loss_var_name,
member_->local_scopes_, member_->nranks_,
member_->use_cuda_, member_->nccl_ctxs_.get());
#else
if (build_strategy.async_mode_ && !build_strategy.is_distribution_) {
for (size_t i = 0; i < member_->places_.size(); ++i) {
......@@ -293,16 +283,13 @@ ParallelExecutor::ParallelExecutor(
VLOG(10) << "Eager Deletion Threshold "
<< static_cast<float>(max_memory_size) / (1 << 30);
if (max_memory_size >= 0) {
for (size_t i = 0; i < graphs.size(); ++i) {
graphs[i] = member_->PrepareGCAndRefCnts(
std::move(graphs[i]), static_cast<size_t>(max_memory_size));
}
graph = member_->PrepareGCAndRefCnts(std::move(graph),
static_cast<size_t>(max_memory_size));
}
// Step 3. Create vars in each scope. Passes may also create new vars.
// skip control vars and empty vars
std::vector<details::VariableInfo> var_infos;
for (auto &graph : graphs) {
for (auto &node : graph->Nodes()) {
if (node->IsVar() && !node->IsCtrlVar() && node->Var()) {
var_infos.emplace_back();
......@@ -311,22 +298,22 @@ ParallelExecutor::ParallelExecutor(
var_infos.back().persistable_ = node->Var()->Persistable();
}
}
}
// If the loss_var_name is given, the number of graph should be only one.
if (loss_var_name.size()) {
size_t graph_num = ir::GraphNum(*graphs[0]);
size_t graph_num = ir::GraphNum(*graph);
if (graph_num > 1) {
LOG(WARNING)
<< "The number of graph should be only one, "
"but the current graph has "
<< ir::GraphNum(*graphs[0])
<< ir::GraphNum(*graph)
<< " sub_graphs. If you want to see the nodes of the "
"sub_graphs, you should use 'FLAGS_print_sub_graph_dir' "
"to specify the output dir. NOTES: if you not do training, "
"please don't pass loss_var_name.";
}
}
if (build_strategy.async_mode_ && !build_strategy.is_distribution_) {
VLOG(3) << "use AsyncSSAGraphExecutor";
member_->executor_.reset(new details::AsyncSSAGraphExecutor(
......@@ -334,20 +321,27 @@ ParallelExecutor::ParallelExecutor(
std::move(graphs)));
} else if (build_strategy.enable_parallel_graph_) {
VLOG(3) << "use ParallelSSAGraphExecutor";
#ifdef PADDLE_WITH_CUDA
// TODO(Yancey1989): Remove passing in the main_program when
// allreduce_seq_pass doesn't need it as the attr.
member_->executor_.reset(new details::ParallelSSAGraphExecutor(
exec_strategy, member_->local_scopes_, member_->places_,
std::move(graphs)));
exec_strategy, member_->local_scopes_, member_->places_, main_program,
std::move(graph)));
#else
PADDLE_THROW(
"Paddle should be compiled with CUDA for ParallelGraph Execution.");
#endif
} else {
if (exec_strategy.type_ == ExecutionStrategy::kDefault) {
VLOG(3) << "use ThreadedSSAGraphExecutor";
member_->executor_.reset(new details::ThreadedSSAGraphExecutor(
exec_strategy, member_->local_scopes_, member_->places_,
std::move(graphs[0])));
std::move(graph)));
} else {
VLOG(3) << "use FastThreadedSSAGraphExecutor";
member_->executor_.reset(new details::FastThreadedSSAGraphExecutor(
exec_strategy, member_->local_scopes_, member_->places_,
std::move(graphs[0])));
std::move(graph)));
}
}
......@@ -509,7 +503,6 @@ bool ParallelExecutor::EnableParallelGraphExecution(
}
if (!member_->use_all_reduce_ || !member_->use_cuda_)
enable_parallel_graph = false;
if (build_strategy.enable_sequential_execution_ ||
exec_strategy.type_ == ExecutionStrategy::ExecutorType::kExperimental)
......
......@@ -171,9 +171,7 @@ void TestInference(const std::string& dirname,
// Enable the profiler
paddle::platform::EnableProfiler(state);
{
paddle::platform::RecordEvent record_event(
"init_program",
paddle::platform::DeviceContextPool::Instance().Get(place));
paddle::platform::RecordEvent record_event("init_program");
inference_program = InitProgram(&executor, scope, dirname, is_combined);
}
......@@ -230,9 +228,7 @@ void TestInference(const std::string& dirname,
// Run repeat times to profile the performance
for (int i = 0; i < repeat; ++i) {
paddle::platform::RecordEvent record_event(
"run_inference",
paddle::platform::DeviceContextPool::Instance().Get(place));
paddle::platform::RecordEvent record_event("run_inference");
if (PrepareContext) {
// Note: if you change the inference_program, you need to call
......
......@@ -356,7 +356,7 @@ void MemInfo::Minus(const size_t &size) {
usage_ -= size;
}
uint64_t MemInfo::GetPeakUsage() { return peak_usage_; }
uint64_t MemInfo::GetPeakUsage() const { return peak_usage_; }
LegacyMemMonitor::~LegacyMemMonitor() {
for (auto &item : gpu_mem_info_) delete item.second;
......@@ -380,10 +380,10 @@ void LegacyMemMonitor::Minus(const int &device, const size_t &size) {
gpu_mem_info_[device]->Minus(size);
}
uint64_t LegacyMemMonitor::GetMemUsage(const int &device) {
uint64_t LegacyMemMonitor::GetMemUsage(const int &device) const {
return gpu_mem_info_.find(device) == gpu_mem_info_.end()
? 0
: gpu_mem_info_[device]->GetPeakUsage();
: gpu_mem_info_.at(device)->GetPeakUsage();
}
void LegacyMemMonitor::PrintMemUsage() {
......
......@@ -27,20 +27,20 @@ namespace allocation {
class MemInfo {
public:
MemInfo() : usage_(0), peak_usage_(0) {}
MemInfo(const MemInfo &) = delete;
MemInfo &operator=(const MemInfo &) = delete;
// return a flag to indicate current operation will create a peak point or not
bool Add(const size_t &);
void Minus(const size_t &);
uint64_t GetPeakUsage();
uint64_t GetPeakUsage() const;
private:
/* current memory usage*/
uint64_t usage_;
uint64_t peak_usage_;
std::mutex mutex_;
DISABLE_COPY_AND_ASSIGN(MemInfo);
};
class LegacyMemMonitor {
......@@ -56,11 +56,11 @@ class LegacyMemMonitor {
void Add(const int &, const size_t &);
void Minus(const int &, const size_t &);
uint64_t GetMemUsage(const int &);
uint64_t GetMemUsage(const int &) const;
void PrintMemUsage();
protected:
private:
MemUsage gpu_mem_info_;
};
......
......@@ -80,7 +80,7 @@ VarHandlePtr BRPCClient::AsyncSendVar(const std::string& ep,
google::protobuf::Closure* done = brpc::NewCallback(
&HandleSendResponse, cntl, response, var_h, ch_ptr, ch_ctx, this);
platform::RecordRPCEvent record_event(method, p_ctx);
platform::RecordRPCEvent record_event(method);
ch_ctx->stub->SendVariable(cntl, &request, response, done);
......@@ -184,7 +184,7 @@ VarHandlePtr BRPCClient::_AsyncGetVar(const std::string& ep,
google::protobuf::Closure* done = brpc::NewCallback(
&HandleGetResponse, cntl, response, var_h, ch_ptr, ch_ctx, this);
platform::RecordRPCEvent record_event(method, p_ctx);
platform::RecordRPCEvent record_event(method);
if (method_name == kGetMonomerRPC) {
ch_ctx->stub->GetMonomerVariable(cntl, &req, response, done);
......@@ -272,7 +272,7 @@ VarHandlePtr BRPCClient::AsyncPrefetchVar(const std::string& ep,
&cntl->request_attachment(), out_var_name_val,
false, 0, table_name_val);
platform::RecordRPCEvent record_event(method, p_ctx);
platform::RecordRPCEvent record_event(method);
google::protobuf::Closure* done = brpc::NewCallback(
&HandleGetResponse, cntl, response, var_h, ch_ptr, ch_ctx, this);
......@@ -311,7 +311,7 @@ VarHandlePtr BRPCClient::AsyncSendFetchBarrier(const std::string& ep,
VarHandlePtr var_h(
new VarHandle(ep, method, FETCH_BARRIER_MESSAGE, nullptr, nullptr));
platform::RecordRPCEvent record_event(method, nullptr);
platform::RecordRPCEvent record_event(method);
google::protobuf::Closure* done = brpc::NewCallback(
&HandleFetchBarrierResponse, cntl, response, var_h, ch_ptr, ch_ctx, this);
......@@ -406,7 +406,7 @@ VarHandlePtr BRPCClient::AsyncSendVarMessage(
sendrecv::VoidMessage* response = new sendrecv::VoidMessage();
cntl->set_timeout_ms(time_out);
platform::RecordRPCEvent record_event(method_name, nullptr);
platform::RecordRPCEvent record_event(method_name);
VarHandlePtr var_h(
new VarHandle(ep, method_name, req.varname(), nullptr, nullptr));
......
......@@ -89,7 +89,7 @@ VarHandlePtr GRPCClient::AsyncSendVar(const std::string& ep,
// stub context
s->response_call_back_ = nullptr;
platform::RecordRPCEvent record_event(method, p_ctx);
platform::RecordRPCEvent record_event(method);
auto call = s->stub_g_.PrepareUnaryCall(
s->context_.get(), "/sendrecv.SendRecvService/SendVariable", req, &cq_);
......@@ -184,7 +184,7 @@ VarHandlePtr GRPCClient::_AsyncGetVar(
// stub context
s->response_call_back_ = ProcGetResponse;
platform::RecordRPCEvent record_event(method, p_ctx);
platform::RecordRPCEvent record_event(method);
auto call =
s->stub_g_.PrepareUnaryCall(s->context_.get(), rpc_path, buf, &cq_);
......@@ -235,7 +235,7 @@ VarHandlePtr GRPCClient::AsyncPrefetchVar(const std::string& ep,
// stub context
s->response_call_back_ = ProcGetResponse;
platform::RecordRPCEvent record_event(method, p_ctx);
platform::RecordRPCEvent record_event(method);
auto call = s->stub_g_.PrepareUnaryCall(
s->context_.get(), "/sendrecv.SendRecvService/PrefetchVariable", req,
......@@ -265,7 +265,7 @@ VarHandlePtr GRPCClient::AsyncSendBatchBarrier(const std::string& ep,
sendrecv::VariableMessage req;
req.set_varname(BATCH_BARRIER_MESSAGE);
platform::RecordRPCEvent record_event(method, nullptr);
platform::RecordRPCEvent record_event(method);
auto rpc = s->stub_->AsyncSendVariable(s->context_.get(), req, &cq_);
rpc->Finish(&s->reply_, &s->status_, reinterpret_cast<void*>(s));
......@@ -290,7 +290,7 @@ VarHandlePtr GRPCClient::AsyncSendFetchBarrier(const std::string& ep,
sendrecv::VariableMessage req;
req.set_varname(FETCH_BARRIER_MESSAGE);
platform::RecordRPCEvent record_event(method, nullptr);
platform::RecordRPCEvent record_event(method);
auto rpc = s->stub_->AsyncGetVariable(s->context_.get(), req, &cq_);
rpc->Finish(&s->reply_, &s->status_, reinterpret_cast<void*>(s));
......@@ -317,7 +317,7 @@ VarHandlePtr GRPCClient::AsyncGetMonomerBarrier(const std::string& ep,
sendrecv::VariableMessage req;
req.set_varname(var_name);
platform::RecordRPCEvent record_event(method, nullptr);
platform::RecordRPCEvent record_event(method);
auto rpc = s->stub_->AsyncGetMonomerBarrier(s->context_.get(), req, &cq_);
rpc->Finish(&s->reply_, &s->status_, reinterpret_cast<void*>(s));
......@@ -342,7 +342,7 @@ VarHandlePtr GRPCClient::AsyncSendComplete(const std::string& ep,
sendrecv::VariableMessage req;
req.set_varname(COMPLETE_MESSAGE);
platform::RecordRPCEvent record_event(method, nullptr);
platform::RecordRPCEvent record_event(method);
auto rpc = s->stub_->AsyncSendVariable(s->context_.get(), req, &cq_);
rpc->Finish(&s->reply_, &s->status_, reinterpret_cast<void*>(s));
......@@ -372,7 +372,7 @@ VarHandlePtr GRPCClient::AsyncCheckpointNotify(const std::string& ep,
req.set_varname(CHECKPOINT_SAVE_MESSAGE);
req.set_out_varname(dir);
platform::RecordRPCEvent record_event(method, nullptr);
platform::RecordRPCEvent record_event(method);
auto rpc = s->stub_->AsyncCheckpointNotify(s->context_.get(), req, &cq_);
rpc->Finish(&s->reply_, &s->status_, reinterpret_cast<void*>(s));
......
......@@ -38,7 +38,7 @@ void SerializeToByteBuffer(const std::string& name, framework::Variable* var,
::grpc::ByteBuffer* msg, const std::string& out_name,
const int trainer_id,
const std::string& table_name) {
platform::RecordRPCEvent record_event("serial", &ctx);
platform::RecordRPCEvent record_event("serial");
VarMsg request;
TensorPayload* payload = nullptr;
......@@ -147,7 +147,7 @@ void DeserializeFromByteBuffer(const ::grpc::ByteBuffer& msg,
const platform::DeviceContext& ctx,
const framework::Scope* scope,
framework::Variable** var, int* trainer_id) {
platform::RecordRPCEvent record_event("deserial", &ctx);
platform::RecordRPCEvent record_event("deserial");
operators::distributed::GRPCVariableResponse resp(scope, &ctx);
PADDLE_ENFORCE(resp.Parse(msg) == 0, "parse bytebuffer to tensor error!");
*var = resp.GetVar();
......
......@@ -85,9 +85,7 @@ class ReadOp : public framework::OperatorBase {
std::vector<framework::LoDTensor> ins;
// For profiling
platform::DeviceContextPool& pool = platform::DeviceContextPool::Instance();
auto& ctx = *pool.Get(dev_place);
platform::RecordEvent record_event(Type(), &ctx);
platform::RecordEvent record_event(Type());
reader->ReadNext(&ins);
if (ins.empty()) {
......
......@@ -88,7 +88,11 @@ cc_library(timer SRCS timer.cc)
cc_test(timer_test SRCS timer_test.cc DEPS timer)
cc_library(device_tracer SRCS device_tracer.cc DEPS boost profiler_proto framework_proto ${GPU_CTX_DEPS})
cc_library(profiler SRCS profiler.cc DEPS device_context device_tracer)
if(WITH_GPU)
nv_library(profiler SRCS profiler.cc profiler.cu DEPS device_context device_tracer)
else()
cc_library(profiler SRCS profiler.cc DEPS device_context device_tracer)
endif()
cc_test(profiler_test SRCS profiler_test.cc DEPS profiler)
nv_test(float16_gpu_test SRCS float16_test.cu DEPS lod_tensor)
......
......@@ -14,17 +14,23 @@ limitations under the License. */
#include "paddle/fluid/platform/device_tracer.h"
#include <deque>
#include <forward_list>
#include <fstream>
#include <list>
#include <map>
#include <mutex> // NOLINT
#include <numeric>
#include <sstream>
#include <string>
#include <thread> // NOLINT
#include <unordered_map>
#include <utility>
#include <vector>
#include "glog/logging.h"
#include "google/protobuf/text_format.h"
#include "paddle/fluid/framework/block_desc.h"
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/string/printf.h"
namespace paddle {
......@@ -33,17 +39,31 @@ namespace {
// Tracking the nested block stacks of each thread.
thread_local std::deque<int> block_id_stack;
// Tracking the nested event stacks.
thread_local std::deque<std::string> annotation_stack;
thread_local std::deque<Event *> annotation_stack;
std::map<uint32_t, int32_t> system_thread_id_map;
std::once_flag tracer_once_flag;
DeviceTracer *tracer = nullptr;
void PrintCuptiHint() {
static bool showed = false;
if (showed) return;
showed = true;
LOG(WARNING) << "Invalid timestamp occured. Please try increasing the "
"FLAGS_multiple_of_cupti_buffer_size.";
}
} // namespace
#ifdef PADDLE_WITH_CUPTI
namespace {
// TODO(panyx0718): Revisit the buffer size here.
uint64_t kBufSize = 32 * 1024;
// The experimental best performance is
// the same size with CUPTI device buffer size(8M)
uint64_t kBufSize = 1024 * 1024 * 8;
uint64_t kAlignSize = 8;
std::unordered_map<CUpti_CallbackId, std::string> runtime_cbid_str,
driver_cbid_str;
#define ALIGN_BUFFER(buffer, align) \
(((uintptr_t)(buffer) & ((align)-1)) \
......@@ -92,15 +112,33 @@ std::string MemcpyKind(CUpti_ActivityMemcpyKind kind) {
return "MEMCPY";
}
std::string DriverKind(CUpti_CallbackId cbid) {
auto iter = driver_cbid_str.find(cbid);
if (iter == driver_cbid_str.end())
return "Driver API " + std::to_string(cbid);
return iter->second;
}
std::string RuntimeKind(CUpti_CallbackId cbid) {
auto iter = runtime_cbid_str.find(cbid);
if (iter == runtime_cbid_str.end())
return "Runtime API " + std::to_string(cbid);
return iter->second;
}
void EnableActivity() {
// Device activity record is created when CUDA initializes, so we
// want to enable it before cuInit() or any CUDA runtime call.
CUPTI_CALL(dynload::cuptiActivityEnable(CUPTI_ACTIVITY_KIND_MEMCPY));
CUPTI_CALL(dynload::cuptiActivityEnable(CUPTI_ACTIVITY_KIND_KERNEL));
CUPTI_CALL(dynload::cuptiActivityEnable(CUPTI_ACTIVITY_KIND_DEVICE));
CUPTI_CALL(dynload::cuptiActivityEnable(CUPTI_ACTIVITY_KIND_MEMSET));
CUPTI_CALL(dynload::cuptiActivityEnable(CUPTI_ACTIVITY_KIND_OVERHEAD));
CUPTI_CALL(
dynload::cuptiActivityEnable(CUPTI_ACTIVITY_KIND_CONCURRENT_KERNEL));
// CUPTI_CALL(dynload::cuptiActivityEnable(CUPTI_ACTIVITY_KIND_KERNEL));
CUPTI_CALL(dynload::cuptiActivityEnable(CUPTI_ACTIVITY_KIND_DRIVER));
CUPTI_CALL(dynload::cuptiActivityEnable(CUPTI_ACTIVITY_KIND_RUNTIME));
// We don't track these activities for now.
// CUPTI_CALL(dynload::cuptiActivityEnable(CUPTI_ACTIVITY_KIND_MEMSET));
// CUPTI_CALL(dynload::cuptiActivityEnable(CUPTI_ACTIVITY_KIND_OVERHEAD));
// CUPTI_CALL(dynload::cuptiActivityEnable(CUPTI_ACTIVITY_KIND_DEVICE));
// CUPTI_CALL(dynload::cuptiActivityEnable(CUPTI_ACTIVITY_KIND_CONTEXT));
// CUPTI_CALL(dynload::cuptiActivityEnable(CUPTI_ACTIVITY_KIND_DRIVER));
// CUPTI_CALL(dynload::cuptiActivityEnable(CUPTI_ACTIVITY_KIND_RUNTIME));
......@@ -110,16 +148,17 @@ void EnableActivity() {
void DisableActivity() {
CUPTI_CALL(dynload::cuptiActivityDisable(CUPTI_ACTIVITY_KIND_MEMCPY));
CUPTI_CALL(dynload::cuptiActivityDisable(CUPTI_ACTIVITY_KIND_KERNEL));
CUPTI_CALL(dynload::cuptiActivityDisable(CUPTI_ACTIVITY_KIND_DEVICE));
CUPTI_CALL(
dynload::cuptiActivityDisable(CUPTI_ACTIVITY_KIND_CONCURRENT_KERNEL));
// CUPTI_CALL(dynload::cuptiActivityDisable(CUPTI_ACTIVITY_KIND_DEVICE));
// Disable all other activity record kinds.
CUPTI_CALL(dynload::cuptiActivityDisable(CUPTI_ACTIVITY_KIND_CONTEXT));
// CUPTI_CALL(dynload::cuptiActivityDisable(CUPTI_ACTIVITY_KIND_CONTEXT));
CUPTI_CALL(dynload::cuptiActivityDisable(CUPTI_ACTIVITY_KIND_DRIVER));
CUPTI_CALL(dynload::cuptiActivityDisable(CUPTI_ACTIVITY_KIND_RUNTIME));
CUPTI_CALL(dynload::cuptiActivityDisable(CUPTI_ACTIVITY_KIND_MEMSET));
CUPTI_CALL(dynload::cuptiActivityDisable(CUPTI_ACTIVITY_KIND_NAME));
CUPTI_CALL(dynload::cuptiActivityDisable(CUPTI_ACTIVITY_KIND_MARKER));
CUPTI_CALL(dynload::cuptiActivityDisable(CUPTI_ACTIVITY_KIND_OVERHEAD));
// CUPTI_CALL(dynload::cuptiActivityDisable(CUPTI_ACTIVITY_KIND_MEMSET));
// CUPTI_CALL(dynload::cuptiActivityDisable(CUPTI_ACTIVITY_KIND_NAME));
// CUPTI_CALL(dynload::cuptiActivityDisable(CUPTI_ACTIVITY_KIND_MARKER));
// CUPTI_CALL(dynload::cuptiActivityDisable(CUPTI_ACTIVITY_KIND_OVERHEAD));
}
void CUPTIAPI bufferRequested(uint8_t **buffer, size_t *size,
......@@ -132,6 +171,11 @@ void CUPTIAPI bufferRequested(uint8_t **buffer, size_t *size,
void CUPTIAPI bufferCompleted(CUcontext ctx, uint32_t streamId, uint8_t *buffer,
size_t size, size_t validSize) {
static std::thread::id cupti_thread_id(0);
if (cupti_thread_id == std::thread::id(0))
cupti_thread_id = std::this_thread::get_id();
PADDLE_ENFORCE_EQ(std::this_thread::get_id(), cupti_thread_id,
"Only one thread is allowed to call bufferCompleted()");
CUptiResult status;
CUpti_Activity *record = NULL;
if (validSize > 0) {
......@@ -168,6 +212,23 @@ void CUPTIAPI bufferCompleted(CUcontext ctx, uint32_t streamId, uint8_t *buffer,
memcpy->correlationId, memcpy->bytes);
break;
}
case CUPTI_ACTIVITY_KIND_DRIVER: {
auto *api = reinterpret_cast<const CUpti_ActivityAPI *>(record);
if (api->start != 0 && api->end != 0)
// -1 device id represents CUDA api call
tracer->AddCPURecords(
DriverKind(api->cbid), api->start, api->end, -1,
GetThreadIdFromSystemThreadId(api->threadId));
break;
}
case CUPTI_ACTIVITY_KIND_RUNTIME: {
auto *api = reinterpret_cast<const CUpti_ActivityAPI *>(record);
if (api->start != 0 && api->end != 0)
tracer->AddCPURecords(
RuntimeKind(api->cbid), api->start, api->end, -1,
GetThreadIdFromSystemThreadId(api->threadId));
break;
}
default: { break; }
}
} else if (status == CUPTI_ERROR_MAX_LIMIT_REACHED) {
......@@ -183,21 +244,35 @@ void CUPTIAPI bufferCompleted(CUcontext ctx, uint32_t streamId, uint8_t *buffer,
dynload::cuptiActivityGetNumDroppedRecords(ctx, streamId, &dropped));
if (dropped != 0) {
fprintf(stderr, "Dropped %u activity records\n", (unsigned int)dropped);
PrintCuptiHint();
}
}
free(buffer);
}
void initCuptiCbidStr();
} // namespace
#endif // PADDLE_WITH_CUPTI
class DeviceTracerImpl : public DeviceTracer {
public:
DeviceTracerImpl() : enabled_(false) {}
DeviceTracerImpl() : enabled_(false) {
#ifdef PADDLE_WITH_CUPTI
initCuptiCbidStr();
#endif
}
void AddAnnotation(uint64_t id, const std::string &anno) {
void AddAnnotation(uint32_t id, Event *event) {
thread_local std::forward_list<std::pair<uint32_t, Event *>>
*local_correlations_pairs = nullptr;
if (local_correlations_pairs == nullptr) {
std::lock_guard<std::mutex> l(trace_mu_);
correlations_[id] = anno;
correlations_pairs.emplace_front();
local_correlations_pairs = &correlations_pairs.front();
}
local_correlations_pairs->push_front(std::make_pair(id, event));
}
void AddCPURecords(const std::string &anno, uint64_t start_ns,
......@@ -206,8 +281,13 @@ class DeviceTracerImpl : public DeviceTracer {
VLOG(1) << "Empty timeline annotation.";
return;
}
thread_local std::forward_list<CPURecord> *local_cpu_records_ = nullptr;
if (local_cpu_records_ == nullptr) {
std::lock_guard<std::mutex> l(trace_mu_);
cpu_records_.push_back(
cpu_records_.emplace_front();
local_cpu_records_ = &cpu_records_.front();
}
local_cpu_records_->push_front(
CPURecord{anno, start_ns, end_ns, device_id, thread_id});
}
......@@ -215,12 +295,13 @@ class DeviceTracerImpl : public DeviceTracer {
uint64_t end_ns, int64_t device_id, int64_t stream_id,
uint32_t correlation_id, uint64_t bytes) {
// 0 means timestamp information could not be collected for the kernel.
if (start_ns == 0 || end_ns == 0) {
if (start_ns == 0 || end_ns == 0 || start_ns == end_ns) {
VLOG(3) << name << " cannot be traced";
PrintCuptiHint();
return;
}
std::lock_guard<std::mutex> l(trace_mu_);
mem_records_.push_back(MemRecord{name, start_ns, end_ns, device_id,
// NOTE(liangdun): lock is not needed, only one thread call this function.
mem_records_.push_front(MemRecord{name, start_ns, end_ns, device_id,
stream_id, correlation_id, bytes});
}
......@@ -228,12 +309,13 @@ class DeviceTracerImpl : public DeviceTracer {
int64_t device_id, int64_t stream_id,
uint32_t correlation_id) {
// 0 means timestamp information could not be collected for the kernel.
if (start == 0 || end == 0) {
if (start == 0 || end == 0 || start == end) {
VLOG(3) << correlation_id << " cannot be traced";
PrintCuptiHint();
return;
}
std::lock_guard<std::mutex> l(trace_mu_);
kernel_records_.push_back(
// NOTE(liangdun): lock is not needed, only one thread call this function.
kernel_records_.push_front(
KernelRecord{name, start, end, device_id, stream_id, correlation_id});
}
......@@ -263,25 +345,80 @@ class DeviceTracerImpl : public DeviceTracer {
} else if (ret != CUPTI_SUCCESS) {
fprintf(stderr, "Failed to create CUPTI subscriber.\n");
}
CUPTI_CALL(
dynload::cuptiEnableCallback(1, subscriber_, CUPTI_CB_DOMAIN_DRIVER_API,
CUPTI_DRIVER_TRACE_CBID_cuLaunchKernel));
const std::vector<int> cbids {
CUPTI_RUNTIME_TRACE_CBID_cudaMemcpy_v3020,
CUPTI_RUNTIME_TRACE_CBID_cudaMemcpyAsync_v3020,
CUPTI_RUNTIME_TRACE_CBID_cudaLaunch_v3020,
CUPTI_RUNTIME_TRACE_CBID_cudaLaunchKernel_v7000
#if CUDA_VERSION >= 9000
,
CUPTI_RUNTIME_TRACE_CBID_cudaLaunchCooperativeKernel_v9000,
CUPTI_RUNTIME_TRACE_CBID_cudaLaunchCooperativeKernelMultiDevice_v9000
#endif
};
for (auto cbid : cbids)
CUPTI_CALL(dynload::cuptiEnableCallback(
1, subscriber_, CUPTI_CB_DOMAIN_RUNTIME_API, cbid));
CUPTI_CALL(dynload::cuptiGetTimestamp(&start_ns_));
#endif // PADDLE_WITH_CUPTI
enabled_ = true;
}
void Reset() {
#ifdef PADDLE_WITH_CUPTI
CUPTI_CALL(
dynload::cuptiActivityFlushAll(CUPTI_ACTIVITY_FLAG_FLUSH_FORCED));
#endif
std::lock_guard<std::mutex> l(trace_mu_);
kernel_records_.clear();
mem_records_.clear();
correlations_.clear();
for (auto &tmp : correlations_pairs) tmp.clear();
for (auto &tmp : cpu_records_) tmp.clear();
}
void GenEventKernelCudaElapsedTime() {
#ifdef PADDLE_WITH_CUPTI
if (correlations_.empty())
for (auto &tmp : correlations_pairs)
for (auto &pair : tmp) correlations_[pair.first] = pair.second;
for (const KernelRecord &r : kernel_records_) {
auto c = correlations_.find(r.correlation_id);
if (c != correlations_.end() && c->second != nullptr) {
Event *e = c->second;
e->AddCudaElapsedTime(r.start_ns, r.end_ns);
}
}
for (const auto &r : mem_records_) {
auto c = correlations_.find(r.correlation_id);
if (c != correlations_.end() && c->second != nullptr) {
Event *e = c->second;
e->AddCudaElapsedTime(r.start_ns, r.end_ns);
}
}
#endif
}
proto::Profile GenProfile(const std::string &profile_path) {
int miss = 0, find = 0;
std::lock_guard<std::mutex> l(trace_mu_);
proto::Profile profile_pb;
profile_pb.set_start_ns(start_ns_);
profile_pb.set_end_ns(end_ns_);
if (correlations_.empty())
for (auto &tmp : correlations_pairs)
for (auto &pair : tmp) correlations_[pair.first] = pair.second;
for (const KernelRecord &r : kernel_records_) {
auto *event = profile_pb.add_events();
event->set_type(proto::Event::GPUKernel);
if (correlations_.find(r.correlation_id) != correlations_.end()) {
event->set_name(correlations_.at(r.correlation_id));
auto c = correlations_.find(r.correlation_id);
if (c != correlations_.end() && c->second != nullptr) {
event->set_name(c->second->name());
event->set_detail_info(r.name);
find++;
} else {
VLOG(10) << "Missing Kernel Event: " + r.name;
miss++;
event->set_name(r.name);
}
event->set_start_ns(r.start_ns);
......@@ -289,8 +426,9 @@ class DeviceTracerImpl : public DeviceTracer {
event->set_sub_device_id(r.stream_id);
event->set_device_id(r.device_id);
}
for (const CPURecord &r : cpu_records_) {
VLOG(1) << "KernelRecord event miss: " << miss << " find: " << find;
for (auto &tmp : cpu_records_)
for (const CPURecord &r : tmp) {
auto *event = profile_pb.add_events();
event->set_type(proto::Event::CPU);
event->set_name(r.name);
......@@ -299,21 +437,30 @@ class DeviceTracerImpl : public DeviceTracer {
event->set_sub_device_id(r.thread_id);
event->set_device_id(r.device_id);
}
miss = find = 0;
for (const MemRecord &r : mem_records_) {
auto *event = profile_pb.add_events();
event->set_type(proto::Event::GPUKernel);
auto c = correlations_.find(r.correlation_id);
if (c != correlations_.end() && c->second != nullptr) {
event->set_name(c->second->name());
event->set_detail_info(r.name);
find++;
} else {
miss++;
event->set_name(r.name);
}
event->set_start_ns(r.start_ns);
event->set_end_ns(r.end_ns);
event->set_sub_device_id(r.stream_id);
event->set_device_id(r.device_id);
event->mutable_memcopy()->set_bytes(r.bytes);
}
VLOG(1) << "MemRecord event miss: " << miss << " find: " << find;
std::ofstream profile_f;
profile_f.open(profile_path, std::ios::out | std::ios::trunc);
std::string profile_str;
profile_pb.SerializeToString(&profile_str);
profile_f << profile_str;
profile_f.open(profile_path,
std::ios::out | std::ios::trunc | std::ios::binary);
profile_pb.SerializeToOstream(&profile_f);
profile_f.close();
return profile_pb;
}
......@@ -321,12 +468,13 @@ class DeviceTracerImpl : public DeviceTracer {
void Disable() {
#ifdef PADDLE_WITH_CUPTI
// flush might cause additional calls to DeviceTracker.
dynload::cuptiActivityFlushAll(CUPTI_ACTIVITY_FLAG_FLUSH_FORCED);
CUPTI_CALL(
dynload::cuptiActivityFlushAll(CUPTI_ACTIVITY_FLAG_FLUSH_FORCED));
#endif // PADDLE_WITH_CUPTI
std::lock_guard<std::mutex> l(trace_mu_);
#ifdef PADDLE_WITH_CUPTI
DisableActivity();
dynload::cuptiUnsubscribe(subscriber_);
CUPTI_CALL(dynload::cuptiUnsubscribe(subscriber_));
CUPTI_CALL(dynload::cuptiGetTimestamp(&end_ns_));
#endif // PADDLE_WITH_CUPTI
enabled_ = false;
......@@ -337,18 +485,10 @@ class DeviceTracerImpl : public DeviceTracer {
static void CUPTIAPI ApiCallback(void *userdata, CUpti_CallbackDomain domain,
CUpti_CallbackId cbid, const void *cbdata) {
auto *cbInfo = reinterpret_cast<const CUpti_CallbackData *>(cbdata);
DeviceTracer *tracer = reinterpret_cast<DeviceTracer *>(userdata);
if ((domain == CUPTI_CB_DOMAIN_DRIVER_API) &&
(cbid == CUPTI_DRIVER_TRACE_CBID_cuLaunchKernel)) {
DeviceTracerImpl *tracer = reinterpret_cast<DeviceTracerImpl *>(userdata);
if (cbInfo->callbackSite == CUPTI_API_ENTER) {
const std::string anno = !annotation_stack.empty()
? annotation_stack.back()
: cbInfo->symbolName;
tracer->AddAnnotation(cbInfo->correlationId, anno);
}
} else {
VLOG(1) << "Unhandled API Callback for " << domain << " " << cbid;
Event *event = CurAnnotation();
tracer->AddAnnotation(cbInfo->correlationId, event);
}
}
CUpti_SubscriberHandle subscriber_;
......@@ -357,10 +497,12 @@ class DeviceTracerImpl : public DeviceTracer {
bool enabled_;
uint64_t start_ns_;
uint64_t end_ns_;
std::vector<KernelRecord> kernel_records_;
std::vector<MemRecord> mem_records_;
std::vector<CPURecord> cpu_records_;
std::unordered_map<uint32_t, std::string> correlations_;
std::forward_list<KernelRecord> kernel_records_;
std::forward_list<MemRecord> mem_records_;
std::forward_list<std::forward_list<CPURecord>> cpu_records_;
std::forward_list<std::forward_list<std::pair<uint32_t, Event *>>>
correlations_pairs;
std::unordered_map<uint32_t, Event *> correlations_;
};
void CreateTracer(DeviceTracer **t) { *t = new DeviceTracerImpl(); }
......@@ -370,21 +512,104 @@ DeviceTracer *GetDeviceTracer() {
return tracer;
}
void SetCurAnnotation(const std::string &anno) {
annotation_stack.push_back(anno);
}
void SetCurAnnotation(Event *event) { annotation_stack.push_back(event); }
void ClearCurAnnotation() { annotation_stack.pop_back(); }
std::string CurAnnotation() {
if (annotation_stack.empty()) return "";
Event *CurAnnotation() {
if (annotation_stack.empty()) return nullptr;
return annotation_stack.back();
}
std::string CurAnnotationName() {
if (annotation_stack.empty()) return "";
return annotation_stack.back()->name();
}
void SetCurBlock(int block_id) { block_id_stack.push_back(block_id); }
void ClearCurBlock() { block_id_stack.pop_back(); }
int BlockDepth() { return block_id_stack.size(); }
uint32_t GetCurSystemThreadId() {
std::stringstream ss;
ss << std::this_thread::get_id();
uint32_t id = static_cast<uint32_t>(std::stoull(ss.str()));
return id;
}
void RecoreCurThreadId(int32_t id) {
auto gid = GetCurSystemThreadId();
VLOG(1) << "RecoreCurThreadId: " << gid << " -> " << id;
system_thread_id_map[gid] = id;
}
int32_t GetThreadIdFromSystemThreadId(uint32_t id) {
auto it = system_thread_id_map.find(id);
if (it != system_thread_id_map.end()) return it->second;
// return origin id if no event is recorded in this thread.
return static_cast<int32_t>(id);
}
#ifdef PADDLE_WITH_CUPTI
namespace {
void initCuptiCbidStr() {
static bool called = false;
if (called) return;
called = true;
#define REGISTER_RUNTIME_CBID_STR(cbid) \
runtime_cbid_str[CUPTI_RUNTIME_TRACE_CBID_##cbid] = #cbid
REGISTER_RUNTIME_CBID_STR(cudaBindTexture_v3020);
REGISTER_RUNTIME_CBID_STR(cudaConfigureCall_v3020);
REGISTER_RUNTIME_CBID_STR(cudaDeviceGetAttribute_v5000);
REGISTER_RUNTIME_CBID_STR(cudaDeviceGetStreamPriorityRange_v5050);
REGISTER_RUNTIME_CBID_STR(cudaDeviceSynchronize_v3020);
REGISTER_RUNTIME_CBID_STR(cudaDriverGetVersion_v3020);
REGISTER_RUNTIME_CBID_STR(cudaEventCreateWithFlags_v3020);
REGISTER_RUNTIME_CBID_STR(cudaEventDestroy_v3020);
REGISTER_RUNTIME_CBID_STR(cudaEventDestroy_v3020);
REGISTER_RUNTIME_CBID_STR(cudaEventQuery_v3020);
REGISTER_RUNTIME_CBID_STR(cudaEventRecord_v3020);
REGISTER_RUNTIME_CBID_STR(cudaFreeHost_v3020);
REGISTER_RUNTIME_CBID_STR(cudaFree_v3020);
REGISTER_RUNTIME_CBID_STR(cudaFuncGetAttributes_v3020);
REGISTER_RUNTIME_CBID_STR(cudaGetDeviceCount_v3020);
REGISTER_RUNTIME_CBID_STR(cudaGetDeviceProperties_v3020);
REGISTER_RUNTIME_CBID_STR(cudaGetDevice_v3020);
REGISTER_RUNTIME_CBID_STR(cudaGetErrorString_v3020);
REGISTER_RUNTIME_CBID_STR(cudaGetLastError_v3020);
REGISTER_RUNTIME_CBID_STR(cudaHostAlloc_v3020);
REGISTER_RUNTIME_CBID_STR(cudaHostGetDevicePointer_v3020);
REGISTER_RUNTIME_CBID_STR(cudaLaunchKernel_v7000);
REGISTER_RUNTIME_CBID_STR(cudaMallocHost_v3020);
REGISTER_RUNTIME_CBID_STR(cudaMalloc_v3020);
REGISTER_RUNTIME_CBID_STR(cudaMemcpyAsync_v3020);
REGISTER_RUNTIME_CBID_STR(cudaMemcpy_v3020);
REGISTER_RUNTIME_CBID_STR(cudaMemsetAsync_v3020);
REGISTER_RUNTIME_CBID_STR(cudaMemset_v3020);
REGISTER_RUNTIME_CBID_STR(
cudaOccupancyMaxActiveBlocksPerMultiprocessorWithFlags_v7000);
REGISTER_RUNTIME_CBID_STR(cudaPeekAtLastError_v3020);
REGISTER_RUNTIME_CBID_STR(cudaRuntimeGetVersion_v3020);
REGISTER_RUNTIME_CBID_STR(cudaSetDevice_v3020);
REGISTER_RUNTIME_CBID_STR(cudaStreamCreate_v3020);
REGISTER_RUNTIME_CBID_STR(cudaStreamCreateWithFlags_v5000);
REGISTER_RUNTIME_CBID_STR(cudaStreamCreateWithPriority_v5050);
REGISTER_RUNTIME_CBID_STR(cudaStreamDestroy_v5050);
REGISTER_RUNTIME_CBID_STR(cudaStreamSynchronize_v3020);
REGISTER_RUNTIME_CBID_STR(cudaStreamWaitEvent_v3020);
REGISTER_RUNTIME_CBID_STR(cudaUnbindTexture_v3020);
#if CUDA_VERSION >= 9000
REGISTER_RUNTIME_CBID_STR(cudaLaunchCooperativeKernel_v9000);
REGISTER_RUNTIME_CBID_STR(cudaLaunchCooperativeKernelMultiDevice_v9000);
#endif
#undef REGISTER_RUNTIME_CBID_STR
}
} // namespace
#endif // PADDLE_WITH_CUPTI
} // namespace platform
} // namespace paddle
......@@ -32,6 +32,8 @@ inline uint64_t PosixInNsec() {
return 1000 * (static_cast<uint64_t>(tv.tv_sec) * 1000000 + tv.tv_usec);
}
class Event;
// DeviceTracer performs the following tasks:
// 1. Register cuda callbacks for various events: kernel, memcpy, etc.
// 2. Collect cuda statistics: start/end ts, memory, etc.
......@@ -68,11 +70,13 @@ class DeviceTracer {
virtual void Enable() = 0;
// Needs to be called once after use.
virtual void Disable() = 0;
// Needs to be called once before reuse.
virtual void Reset() = 0;
// Add a pair to correlate internal cuda id with high level
// annotation (string). So cuda statistics can be represented by
// annotation event(with string). So cuda statistics can be represented by
// human-readable annotations.
virtual void AddAnnotation(uint64_t id, const std::string& anno) = 0;
virtual void AddAnnotation(uint32_t id, Event* event) = 0;
virtual void AddMemRecords(const std::string& name, uint64_t start_ns,
uint64_t end_ns, int64_t device_id,
......@@ -92,6 +96,9 @@ class DeviceTracer {
// Generate a proto after done (Disabled).
virtual proto::Profile GenProfile(const std::string& profile_path) = 0;
// generate kernel elapsed time into Event
virtual void GenEventKernelCudaElapsedTime() = 0;
virtual bool IsEnabled() = 0;
};
......@@ -99,14 +106,19 @@ class DeviceTracer {
DeviceTracer* GetDeviceTracer();
// Set a name for the cuda kernel operation being launched by the thread.
void SetCurAnnotation(const std::string& anno);
void SetCurAnnotation(Event* event);
// Clear the name after the operation is done.
void ClearCurAnnotation();
// Current name of the operation being run in the thread.
std::string CurAnnotation();
std::string CurAnnotationName();
Event* CurAnnotation();
void SetCurBlock(int block_id);
void ClearCurBlock();
int BlockDepth();
// Set current thread id, so we can map the system thread id to thread id.
void RecoreCurThreadId(int32_t id);
int32_t GetThreadIdFromSystemThreadId(uint32_t id);
} // namespace platform
} // namespace paddle
......@@ -22,6 +22,7 @@ limitations under the License. */
#include "paddle/fluid/string/split.h"
#ifdef PADDLE_WITH_CUDA
#include "paddle/fluid/platform/cuda_device_guard.h"
#include "paddle/fluid/platform/dynload/cupti.h"
#endif
#include "paddle/fluid/platform/device_context.h"
#include "paddle/fluid/platform/init.h"
......@@ -30,6 +31,9 @@ limitations under the License. */
DEFINE_int32(paddle_num_threads, 1,
"Number of threads for each paddle instance.");
DEFINE_int32(multiple_of_cupti_buffer_size, 1,
"Multiple of the CUPTI device buffer size. If the timestamps have "
"been dropped when you are profiling, try increasing this value.");
namespace paddle {
namespace framework {
......@@ -78,7 +82,32 @@ void InitP2P(std::vector<int> devices) {
#endif
}
void InitCupti() {
#ifdef PADDLE_WITH_CUPTI
if (FLAGS_multiple_of_cupti_buffer_size == 1) return;
size_t attrValue = 0, attrValueSize = sizeof(size_t);
#define MULTIPLY_ATTR_VALUE(attr) \
{ \
PADDLE_ENFORCE(!platform::dynload::cuptiActivityGetAttribute( \
attr, &attrValueSize, &attrValue)); \
attrValue *= FLAGS_multiple_of_cupti_buffer_size; \
LOG(WARNING) << "Set " #attr " " << attrValue << " byte"; \
PADDLE_ENFORCE(!platform::dynload::cuptiActivitySetAttribute( \
attr, &attrValueSize, &attrValue)); \
}
MULTIPLY_ATTR_VALUE(CUPTI_ACTIVITY_ATTR_DEVICE_BUFFER_SIZE);
MULTIPLY_ATTR_VALUE(CUPTI_ACTIVITY_ATTR_DEVICE_BUFFER_SIZE_CDP);
#if CUDA_VERSION >= 9000
MULTIPLY_ATTR_VALUE(CUPTI_ACTIVITY_ATTR_PROFILING_SEMAPHORE_POOL_SIZE);
#endif
#undef MULTIPLY_ATTR_VALUE
#endif
}
void InitDevices(bool init_p2p) {
// CUPTI attribute should be set before any CUDA context is created (see CUPTI
// documentation about CUpti_ActivityAttribute).
InitCupti();
/*Init all available devices by default */
std::vector<int> devices;
#ifdef PADDLE_WITH_CUDA
......
......@@ -12,6 +12,8 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/platform/profiler.h"
#include <algorithm>
#include <iomanip>
#include <limits>
......@@ -27,7 +29,6 @@ limitations under the License. */
#include "paddle/fluid/framework/block_desc.h"
#include "paddle/fluid/platform/device_tracer.h"
#include "paddle/fluid/platform/port.h"
#include "paddle/fluid/platform/profiler.h"
#include "paddle/fluid/string/printf.h"
DEFINE_bool(enable_rpc_profiler, false, "Enable rpc profiler or not.");
......@@ -66,12 +67,13 @@ struct EventList {
((kEventSize + kEventAlign - 1) / kEventAlign * kEventAlign);
template <typename... Args>
void Record(Args&&... args) {
Event* Record(Args&&... args) {
if (event_blocks.empty() || event_blocks.front().size() == kNumBlock) {
event_blocks.emplace_front();
event_blocks.front().reserve(kNumBlock);
}
event_blocks.front().emplace_back(std::forward<Args>(args)...);
return &event_blocks.front().back();
}
std::vector<Event> Reduce() {
......@@ -98,21 +100,8 @@ inline uint64_t GetTimeInNsec() {
.count();
}
Event::Event(EventType type, std::string name, uint32_t thread_id,
const DeviceContext* dev_ctx)
: type_(type), name_(name), thread_id_(thread_id), has_cuda_(false) {
#ifdef PADDLE_WITH_CUDA
has_cuda_ = dev_ctx ? platform::is_gpu_place(dev_ctx->GetPlace()) : false;
if (has_cuda_) {
auto* cuda_dev_ctx = static_cast<const CUDADeviceContext*>(dev_ctx);
PADDLE_ENFORCE(cudaSetDevice(
boost::get<platform::CUDAPlace>(cuda_dev_ctx->GetPlace()).device));
PADDLE_ENFORCE(cudaGetDevice(&device_));
PADDLE_ENFORCE(cudaEventCreate(&event_));
auto stream = cuda_dev_ctx->stream();
PADDLE_ENFORCE(cudaEventRecord(event_, stream));
}
#endif
Event::Event(EventType type, std::string name, uint32_t thread_id)
: type_(type), name_(name), thread_id_(thread_id) {
cpu_ns_ = GetTimeInNsec();
}
......@@ -124,88 +113,70 @@ double Event::CpuElapsedMs(const Event& e) const {
double Event::CudaElapsedMs(const Event& e) const {
#ifdef PADDLE_WITH_CUDA
if (!has_cuda_) return 0.0;
PADDLE_ENFORCE(e.has_cuda() && has_cuda());
PADDLE_ENFORCE(e.device() == device());
PADDLE_ENFORCE(cudaEventSynchronize(event_));
PADDLE_ENFORCE(cudaEventSynchronize(e.event()));
float ms;
PADDLE_ENFORCE(cudaEventElapsedTime(&ms, event_, e.event()));
return ms;
#ifdef PADDLE_WITH_CUPTI
return gpu_ns_ / 1000000.0;
#endif
#else
PADDLE_THROW("CUDA is not enabled");
#endif
}
#ifdef PADDLE_WITH_CUDA
static void ForEachDevice(std::function<void(int)> func) {
auto original_device = GetCurrentDeviceId();
int count = GetCUDADeviceCount();
for (int i = 0; i < count; i++) {
SetDeviceId(i);
func(i);
}
SetDeviceId(original_device);
}
#endif
inline EventList& GetEventList() {
if (!g_event_list) {
std::lock_guard<std::mutex> guard(g_all_event_lists_mutex);
g_event_list = std::make_shared<EventList>();
g_thread_id = g_next_thread_id++;
g_all_event_lists.emplace_front(g_event_list);
RecoreCurThreadId(g_thread_id);
}
return *g_event_list;
}
void Mark(const std::string& name, const DeviceContext* dev_ctx) {
GetEventList().Record(EventType::kMark, name, g_thread_id, dev_ctx);
void Mark(const std::string& name) {
GetEventList().Record(EventType::kMark, name, g_thread_id);
}
void PushEvent(const std::string& name, const DeviceContext* dev_ctx) {
GetEventList().Record(EventType::kPushRange, name, g_thread_id, dev_ctx);
Event* PushEvent(const std::string& name) {
return GetEventList().Record(EventType::kPushRange, name, g_thread_id);
}
void PopEvent(const std::string& name, const DeviceContext* dev_ctx) {
GetEventList().Record(EventType::kPopRange, name, g_thread_id, dev_ctx);
void PopEvent(const std::string& name) {
GetEventList().Record(EventType::kPopRange, name, g_thread_id);
}
RecordEvent::RecordEvent(const std::string& name, const DeviceContext* dev_ctx)
RecordEvent::RecordEvent(const std::string& name)
: is_enabled_(false), start_ns_(PosixInNsec()) {
if (g_state == ProfilerState::kDisabled) return;
std::lock_guard<std::mutex> l(profiler_mu);
// lock is not needed, the code below is thread-safe
is_enabled_ = true;
dev_ctx_ = dev_ctx;
name_ = name;
PushEvent(name_, dev_ctx_);
Event* e = PushEvent(name_);
// Maybe need the same push/pop behavior.
SetCurAnnotation(name_);
SetCurAnnotation(e);
}
RecordEvent::~RecordEvent() {
if (g_state == ProfilerState::kDisabled || !is_enabled_) return;
std::lock_guard<std::mutex> l(profiler_mu);
// lock is not needed, the code below is thread-safe
DeviceTracer* tracer = GetDeviceTracer();
if (tracer) {
tracer->AddCPURecords(CurAnnotation(), start_ns_, PosixInNsec(),
tracer->AddCPURecords(CurAnnotationName(), start_ns_, PosixInNsec(),
BlockDepth(), g_thread_id);
}
ClearCurAnnotation();
PopEvent(name_, dev_ctx_);
PopEvent(name_);
}
RecordRPCEvent::RecordRPCEvent(const std::string& name,
const DeviceContext* dev_ctx) {
RecordRPCEvent::RecordRPCEvent(const std::string& name) {
if (FLAGS_enable_rpc_profiler) {
event_.reset(new platform::RecordEvent(name, dev_ctx));
event_.reset(new platform::RecordEvent(name));
}
}
RecordBlock::RecordBlock(int block_id)
: is_enabled_(false), start_ns_(PosixInNsec()) {
std::lock_guard<std::mutex> l(profiler_mu);
// lock is not needed, the code below is thread-safe
if (g_state == ProfilerState::kDisabled) return;
is_enabled_ = true;
SetCurBlock(block_id);
......@@ -213,7 +184,7 @@ RecordBlock::RecordBlock(int block_id)
}
RecordBlock::~RecordBlock() {
std::lock_guard<std::mutex> l(profiler_mu);
// lock is not needed, the code below is thread-safe
if (g_state == ProfilerState::kDisabled || !is_enabled_) return;
DeviceTracer* tracer = GetDeviceTracer();
if (tracer) {
......@@ -225,11 +196,21 @@ RecordBlock::~RecordBlock() {
ClearCurBlock();
}
void SynchronizeAllDevice() {
#ifdef PADDLE_WITH_CUDA
int count = GetCUDADeviceCount();
for (int i = 0; i < count; i++) {
SetDeviceId(i);
PADDLE_ENFORCE(cudaDeviceSynchronize());
}
#endif
}
void EnableProfiler(ProfilerState state) {
PADDLE_ENFORCE(state != ProfilerState::kDisabled,
"Can't enable profiling, since the input state is ",
"ProfilerState::kDisabled");
SynchronizeAllDevice();
std::lock_guard<std::mutex> l(profiler_mu);
if (state == g_state) {
return;
......@@ -238,23 +219,20 @@ void EnableProfiler(ProfilerState state) {
should_send_profile_state = true;
GetDeviceTracer()->Enable();
#ifdef PADDLE_WITH_CUDA
if (g_state == ProfilerState::kCUDA) {
if (g_state == ProfilerState::kCUDA || g_state == ProfilerState::kAll ||
g_state == ProfilerState::kCPU) {
// Generate some dummy events first to reduce the startup overhead.
for (int i = 0; i < 5; i++) {
ForEachDevice([](int d) {
DeviceContext* dev_ctx = new CUDADeviceContext(CUDAPlace(d));
Mark("_cuda_startup_", dev_ctx);
dev_ctx->Wait();
delete dev_ctx;
});
}
DummyKernelAndEvent();
GetDeviceTracer()->Reset();
}
#endif
// Mark the profiling start.
Mark("_start_profiler_", nullptr);
Mark("_start_profiler_");
}
void ResetProfiler() {
SynchronizeAllDevice();
GetDeviceTracer()->Reset();
std::lock_guard<std::mutex> guard(g_all_event_lists_mutex);
for (auto it = g_all_event_lists.begin(); it != g_all_event_lists.end();
++it) {
......@@ -481,20 +459,23 @@ void ParseEvents(const std::vector<std::vector<Event>>& events,
void DisableProfiler(EventSortingKey sorted_key,
const std::string& profile_path) {
SynchronizeAllDevice();
std::lock_guard<std::mutex> l(profiler_mu);
if (g_state == ProfilerState::kDisabled) return;
// Mark the profiling stop.
Mark("_stop_profiler_", nullptr);
Mark("_stop_profiler_");
std::vector<std::vector<Event>> all_events = GetAllEvents();
ParseEvents(all_events, true, sorted_key);
ParseEvents(all_events, false, sorted_key);
ResetProfiler();
DeviceTracer* tracer = GetDeviceTracer();
if (tracer->IsEnabled()) {
tracer->Disable();
tracer->GenProfile(profile_path);
tracer->GenEventKernelCudaElapsedTime();
}
std::vector<std::vector<Event>> all_events = GetAllEvents();
ParseEvents(all_events, true, sorted_key);
ParseEvents(all_events, false, sorted_key);
ResetProfiler();
g_state = ProfilerState::kDisabled;
should_send_profile_state = true;
}
......
/* Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#include "paddle/fluid/platform/profiler.h"
#include <cuda.h>
namespace paddle {
namespace platform {
__global__ void DummyKernel(int *a) { a[0] = 0; }
static void ForEachDevice(std::function<void(int)> func) {
auto original_device = GetCurrentDeviceId();
int count = GetCUDADeviceCount();
for (int i = 0; i < count; i++) {
SetDeviceId(i);
func(i);
}
SetDeviceId(original_device);
}
void DummyKernelAndEvent() {
for (int i = 0; i < 5; i++) {
ForEachDevice([](int d) {
CUDADeviceContext *dev_ctx = new CUDADeviceContext(CUDAPlace(d));
Mark("_cuda_startup_");
int *ptr;
PADDLE_ENFORCE(cudaMalloc(&ptr, sizeof(int)));
DummyKernel<<<1, 1, 0, dev_ctx->stream()>>>(ptr);
dev_ctx->Wait();
PADDLE_ENFORCE(cudaFree(ptr));
delete dev_ctx;
});
}
}
} // namespace platform
} // namespace paddle
......@@ -28,17 +28,17 @@ class Event {
public:
// The DeviceContext is used to get the cuda stream.
// If CPU profiling mode, can pass nullptr.
Event(EventType type, std::string name, uint32_t thread_id,
const DeviceContext* dev_ctx);
Event(EventType type, std::string name, uint32_t thread_id);
const EventType& type() const;
std::string name() const { return name_; }
uint32_t thread_id() const { return thread_id_; }
bool has_cuda() const { return has_cuda_; }
#ifdef PADDLE_WITH_CUDA
#ifndef PADDLE_WITH_CUPTI
cudaEvent_t event() const { return event_; }
int device() const { return device_; }
#endif
#endif
double CpuElapsedMs(const Event& e) const;
......@@ -49,11 +49,21 @@ class Event {
std::string name_;
uint32_t thread_id_;
int64_t cpu_ns_;
bool has_cuda_;
#ifdef PADDLE_WITH_CUDA
#ifdef PADDLE_WITH_CUPTI
int64_t gpu_ns_ = 0;
public:
void AddCudaElapsedTime(int64_t start_ns, int64_t end_ns) {
gpu_ns_ += end_ns - start_ns;
}
private:
#else
cudaEvent_t event_ = nullptr;
int device_ = -1;
#endif
#endif
};
enum ProfilerState {
......@@ -63,22 +73,19 @@ enum ProfilerState {
kAll, // Profile both CPU and GPU. (Currently experimental).
};
void Mark(const std::string& name, const DeviceContext* dev_ctx);
void Mark(const std::string& name);
void PushEvent(const std::string& name, const DeviceContext* dev_ctx);
Event* PushEvent(const std::string& name);
void PopEvent(const std::string& name, const DeviceContext* dev_ctx);
void PopEvent(const std::string& name);
struct RecordEvent {
// dev_ctx can be set to nullptr if device is cpu.
RecordEvent(const std::string& name, const DeviceContext* dev_ctx);
explicit RecordEvent(const std::string& name);
~RecordEvent();
bool is_enabled_;
uint64_t start_ns_;
// The device context is used by Event to get the current cuda stream.
const DeviceContext* dev_ctx_;
// Event name
std::string name_;
// Need to distinguish name by op type, block_id, program_id and perhaps
......@@ -88,8 +95,7 @@ struct RecordEvent {
class RecordRPCEvent {
public:
// dev_ctx can be set to nullptr if device is cpu.
RecordRPCEvent(const std::string& name, const DeviceContext* dev_ctx);
explicit RecordRPCEvent(const std::string& name);
~RecordRPCEvent() {}
private:
......@@ -132,5 +138,9 @@ bool ShouldSendProfileState();
void SetProfileListener();
int64_t ListenerId();
#ifdef PADDLE_WITH_CUDA
void DummyKernelAndEvent();
#endif
} // namespace platform
} // namespace paddle
......@@ -31,6 +31,7 @@ message Event {
optional int64 sub_device_id = 6;
optional MemCopy memcopy = 7;
optional string detail_info = 9;
}
message Profile {
......
......@@ -23,76 +23,49 @@ TEST(Event, CpuElapsedTime) {
using paddle::platform::Event;
using paddle::platform::EventType;
Event start_event(EventType::kPushRange, "test", 0, nullptr);
EXPECT_TRUE(start_event.has_cuda() == false);
Event start_event(EventType::kPushRange, "test", 0);
int counter = 0;
while (counter != 1000) {
counter++;
}
Event stop_event(EventType::kPopRange, "test", 0, nullptr);
Event stop_event(EventType::kPopRange, "test", 0);
EXPECT_GT(start_event.CpuElapsedMs(stop_event), 0);
}
#ifdef PADDLE_WITH_CUDA
TEST(Event, CudaElapsedTime) {
using paddle::platform::DeviceContext;
using paddle::platform::CUDADeviceContext;
using paddle::platform::CUDAPlace;
using paddle::platform::Event;
using paddle::platform::EventType;
DeviceContext* dev_ctx = new CUDADeviceContext(CUDAPlace(0));
Event start_event(EventType::kPushRange, "test", 0, dev_ctx);
EXPECT_TRUE(start_event.has_cuda() == true);
int counter = 0;
while (counter != 1000) {
counter++;
}
Event stop_event(EventType::kPopRange, "test", 0, dev_ctx);
EXPECT_GT(start_event.CudaElapsedMs(stop_event), 0);
}
#endif
TEST(RecordEvent, RecordEvent) {
using paddle::platform::DeviceContext;
using paddle::platform::Event;
using paddle::platform::EventType;
using paddle::platform::RecordEvent;
using paddle::platform::PushEvent;
using paddle::platform::PopEvent;
using paddle::platform::ProfilerState;
using paddle::platform::EventSortingKey;
ProfilerState state = ProfilerState::kCPU;
DeviceContext* dev_ctx = nullptr;
#ifdef PADDLE_WITH_CUDA
using paddle::platform::CUDADeviceContext;
using paddle::platform::CUDAPlace;
state = ProfilerState::kCUDA;
dev_ctx =
new paddle::platform::CUDADeviceContext(paddle::platform::CUDAPlace(0));
#endif
EnableProfiler(state);
/* Usage 1:
* PushEvent(evt_name, dev_ctx);
* PushEvent(evt_name);
* ...
* code to be analyzed
* ...
* PopEvent(evt_name, dev_ctx);
* PopEvent(evt_name);
*/
LOG(INFO) << "Usage 1: PushEvent & PopEvent";
for (int loop = 0; loop < 3; ++loop) {
for (int i = 1; i < 5; ++i) {
std::string name = "op_" + std::to_string(i);
PushEvent(name, dev_ctx);
PushEvent(name);
int counter = 1;
while (counter != i * 1000) counter++;
PopEvent(name, dev_ctx);
PopEvent(name);
}
}
/* Usage 2:
* {
* RecordEvent record_event(name, dev_ctx);
* RecordEvent record_event(name);
* ...
* code to be analyzed
* ...
......@@ -101,7 +74,7 @@ TEST(RecordEvent, RecordEvent) {
LOG(INFO) << "Usage 2: RecordEvent";
for (int i = 1; i < 5; ++i) {
std::string name = "evs_op_" + std::to_string(i);
RecordEvent record_event(name, dev_ctx);
RecordEvent record_event(name);
int counter = 1;
while (counter != i * 1000) counter++;
}
......@@ -123,20 +96,20 @@ TEST(RecordEvent, RecordEvent) {
LOG(INFO) << "Usage 3: nested RecordEvent";
for (int i = 1; i < 5; ++i) {
std::string name = "ano_evs_op_" + std::to_string(i);
RecordEvent record_event(name, dev_ctx);
RecordEvent record_event(name);
int counter = 1;
while (counter != i * 100) counter++;
{
std::string nested_name = "nested_ano_evs_op_" + std::to_string(i);
RecordEvent nested_record_event(nested_name, dev_ctx);
RecordEvent nested_record_event(nested_name);
int nested_counter = 1;
while (nested_counter != i * 100) nested_counter++;
}
}
// Bad Usage:
PushEvent("event_without_pop", dev_ctx);
PopEvent("event_without_push", dev_ctx);
PushEvent("event_without_pop");
PopEvent("event_without_push");
std::vector<std::vector<Event>> events = paddle::platform::GetAllEvents();
int cuda_startup_count = 0;
......
......@@ -131,7 +131,8 @@ def __bootstrap__():
'eager_delete_tensor_gb', 'fast_eager_deletion_mode',
'allocator_strategy', 'reader_queue_speed_test_mode',
'print_sub_graph_dir', 'pe_profile_fname', 'warpctc_dir',
'inner_op_parallelism', 'enable_parallel_graph'
'inner_op_parallelism', 'enable_parallel_graph',
'multiple_of_cupti_buffer_size'
]
if 'Darwin' not in sysstr:
read_env_flags.append('use_pinned_memory')
......
......@@ -649,6 +649,7 @@ class AdagradOptimizer(Optimizer):
regularization: A Regularizer, such as
fluid.regularizer.L2DecayRegularizer.
name: A optional name prefix.
initial_accumulator_value (float): Initial value for moment accumulator.
Examples:
.. code-block:: python
......@@ -662,7 +663,8 @@ class AdagradOptimizer(Optimizer):
learning_rate,
epsilon=1.0e-6,
regularization=None,
name=None):
name=None,
initial_accumulator_value=0.0):
assert learning_rate is not None
assert epsilon is not None
super(AdagradOptimizer, self).__init__(
......@@ -671,6 +673,7 @@ class AdagradOptimizer(Optimizer):
name=name)
self.type = "adagrad"
self._epsilon = epsilon
self.initial_accumulator_value = initial_accumulator_value
def _create_accumulators(self, block, parameters):
assert isinstance(block, framework.Block)
......@@ -683,6 +686,16 @@ class AdagradOptimizer(Optimizer):
moment_acc = self._get_accumulator(self._moment_acc_str,
param_and_grad[0])
startup_block = framework.default_startup_program().global_block()
startup_block.append_op(
type='fill_constant',
inputs={},
outputs={'Out': [moment_acc]},
attrs={
'dtype': moment_acc.dtype,
'value': self.initial_accumulator_value,
'shape': moment_acc.shape,
})
# Create the adagrad optimizer op
adagrad_op = block.append_op(
......
......@@ -274,7 +274,7 @@ class TestAdagradOptimizer(unittest.TestCase):
# Check init_program
init_ops = init_program.global_block().ops
self.assertEqual(len(init_ops), 2)
self.assertEqual(len(init_ops), 3)
self.assertEqual(init_ops[0].type, "fill_constant")
self.assertAlmostEqual(init_ops[0].attr('value'), learning_rate)
self.assertEqual(init_ops[1].type, "fill_constant")
......
# Copyright (c) 2018 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import print_function
import unittest
import numpy as np
import os
os.environ['FLAGS_enable_parallel_graph'] = str(1)
import paddle.fluid.core as core
import os
import paddle.fluid as fluid
from parallel_executor_test_base import TestParallelExecutorBase
def simple_fc_net(use_feed):
img = fluid.layers.data(name='image', shape=[784], dtype='float32')
label = fluid.layers.data(name='label', shape=[1], dtype='int64')
hidden = img
for _ in range(4):
hidden = fluid.layers.fc(
hidden,
size=200,
act='tanh',
bias_attr=fluid.ParamAttr(
initializer=fluid.initializer.Constant(value=1.0)))
prediction = fluid.layers.fc(hidden, size=10, act='softmax')
loss = fluid.layers.cross_entropy(input=prediction, label=label)
loss = fluid.layers.mean(loss)
return loss
class TestMNIST(TestParallelExecutorBase):
@classmethod
def setUpClass(cls):
os.environ['CPU_NUM'] = str(4)
def _init_data(self):
np.random.seed(5)
img = np.random.random(size=[32, 784]).astype(np.float32)
label = np.ones(shape=[32, 1], dtype='int64')
return img, label
# simple_fc
def check_simple_fc_convergence(self, use_cuda, use_reduce=False):
if use_cuda and not core.is_compiled_with_cuda():
return
img, label = self._init_data()
self.check_network_convergence(
simple_fc_net,
feed_dict={"image": img,
"label": label},
use_cuda=use_cuda,
use_reduce=use_reduce)
def test_simple_fc(self):
# use_cuda
self.check_simple_fc_convergence(True)
def check_simple_fc_parallel_accuracy(self, use_cuda):
if use_cuda and not core.is_compiled_with_cuda():
return
img, label = self._init_data()
single_first_loss, single_last_loss = self.check_network_convergence(
method=simple_fc_net,
seed=1,
feed_dict={"image": img,
"label": label},
use_cuda=use_cuda,
use_parallel_executor=False)
parallel_first_loss, parallel_last_loss = self.check_network_convergence(
method=simple_fc_net,
seed=1,
feed_dict={"image": img,
"label": label},
use_cuda=use_cuda,
use_parallel_executor=True)
self.assertAlmostEquals(
np.mean(parallel_first_loss),
single_first_loss,
delta=1e-6, )
self.assertAlmostEquals(
np.mean(parallel_last_loss), single_last_loss, delta=1e-6)
def test_simple_fc_parallel_accuracy(self):
self.check_simple_fc_parallel_accuracy(True)
if __name__ == '__main__':
unittest.main()
......@@ -16,15 +16,19 @@ from __future__ import print_function
import unittest
import os
import tempfile
import numpy as np
import paddle.fluid as fluid
import paddle.fluid.profiler as profiler
import paddle.fluid.layers as layers
import paddle.fluid.core as core
import paddle.fluid.proto.profiler.profiler_pb2 as profiler_pb2
class TestProfiler(unittest.TestCase):
def net_profiler(self, state, profile_path='/tmp/profile'):
def net_profiler(self, state, use_parallel_executor=False):
profile_path = os.path.join(tempfile.gettempdir(), "profile")
open(profile_path, "w").write("")
startup_program = fluid.Program()
main_program = fluid.Program()
......@@ -60,6 +64,11 @@ class TestProfiler(unittest.TestCase):
place = fluid.CPUPlace() if state == 'CPU' else fluid.CUDAPlace(0)
exe = fluid.Executor(place)
exe.run(startup_program)
if use_parallel_executor:
pe = fluid.ParallelExecutor(
state != 'CPU',
loss_name=avg_cost.name,
main_program=main_program)
pass_acc_calculator = fluid.average.WeightedAverage()
with profiler.profiler(state, 'total', profile_path) as prof:
......@@ -69,6 +78,9 @@ class TestProfiler(unittest.TestCase):
x = np.random.random((32, 784)).astype("float32")
y = np.random.randint(0, 10, (32, 1)).astype("int64")
if use_parallel_executor:
pe.run(feed={'x': x, 'y': y}, fetch_list=[avg_cost.name])
continue
outs = exe.run(main_program,
feed={'x': x,
'y': y},
......@@ -77,21 +89,37 @@ class TestProfiler(unittest.TestCase):
b_size = np.array(outs[2])
pass_acc_calculator.add(value=acc, weight=b_size)
pass_acc = pass_acc_calculator.eval()
data = open(profile_path, 'rb').read()
self.assertGreater(len(data), 0)
profile_pb = profiler_pb2.Profile()
profile_pb.ParseFromString(data)
self.assertGreater(len(profile_pb.events), 0)
for event in profile_pb.events:
if event.type == profiler_pb2.Event.GPUKernel:
if not event.detail_info and not event.name.startswith("MEM"):
raise Exception(
"Kernel %s missing event. Has this kernel been recorded by RecordEvent?"
% event.name)
elif event.type == profiler_pb2.Event.CPU and (
event.name.startswith("Driver API") or
event.name.startswith("Runtime API")):
print("Warning: unregister", event.name)
def test_cpu_profiler(self):
self.net_profiler('CPU')
self.net_profiler('CPU', use_parallel_executor=True)
@unittest.skipIf(not core.is_compiled_with_cuda(),
"profiler is enabled only with GPU")
def test_cuda_profiler(self):
self.net_profiler('GPU')
self.net_profiler('GPU', use_parallel_executor=True)
@unittest.skipIf(not core.is_compiled_with_cuda(),
"profiler is enabled only with GPU")
def test_all_profiler(self):
self.net_profiler('All', '/tmp/profile_out')
with open('/tmp/profile_out', 'rb') as f:
self.assertGreater(len(f.read()), 0)
self.net_profiler('All')
self.net_profiler('All', use_parallel_executor=True)
if __name__ == '__main__':
......
......@@ -24,3 +24,8 @@ sed 's/<baseimg>/9.0-cudnn7-devel-centos6/g' Dockerfile.x64 | \
sed 's/<NCCL_MAKE_OPTS>/NVCC_GENCODE="-gencode=arch=compute_35,code=sm_35 -gencode=arch=compute_50,code=sm_50 -gencode=arch=compute_52,code=sm_52 -gencode=arch=compute_60,code=sm_60 -gencode=arch=compute_60,code=compute_60 -gencode=arch=compute_61,code=sm_61 -gencode=arch=compute_62,code=sm_62 -gencode=arch=compute_70,code=sm_70"/g'> Dockerfile.tmp
docker build -t ${REPO}/paddle_manylinux_devel:cuda9.0_cudnn7 -f Dockerfile.tmp .
docker push ${REPO}/paddle_manylinux_devel:cuda9.0_cudnn7
sed 's/<baseimg>/10.0-devel-centos6/g' Dockerfile.x64 | \
sed 's/<NCCL_MAKE_OPTS>/NVCC_GENCODE="-gencode=arch=compute_35,code=sm_35 -gencode=arch=compute_50,code=sm_50 -gencode=arch=compute_52,code=sm_52 -gencode=arch=compute_60,code=sm_60 -gencode=arch=compute_60,code=compute_60 -gencode=arch=compute_61,code=sm_61 -gencode=arch=compute_62,code=sm_62 -gencode=arch=compute_70,code=sm_70 -gencode=arch=compute_75,code=sm_75"/g'> Dockerfile.tmp
docker build -t ${REPO}/paddle_manylinux_devel:cuda10.0_cudnn7 -f Dockerfile.tmp .
docker push ${REPO}/paddle_manylinux_devel:cuda10.0_cudnn7
......@@ -107,11 +107,13 @@ curl-config --features
rm -rf /usr/local/ssl
# Install patchelf (latest with unreleased bug fixes)
curl -sLO https://nixos.org/releases/patchelf/patchelf-0.9/patchelf-0.9.tar.gz
check_sha256sum patchelf-0.9.tar.gz $PATCHELF_HASH
tar -xzf patchelf-0.9.tar.gz
(cd patchelf-0.9 && ./configure && make && make install)
rm -rf patchelf-0.9.tar.gz patchelf-0.9
# FIXME(typhoonzero): restore this when the link is fixed.
# curl -sLO http://nipy.bic.berkeley.edu/manylinux/patchelf-0.9njs2.tar.gz
# check_sha256sum patchelf-0.9njs2.tar.gz $PATCHELF_HASH
# tar -xzf patchelf-0.9njs2.tar.gz
# (cd patchelf-0.9njs2 && ./configure && make && make install)
# rm -rf patchelf-0.9njs2.tar.gz patchelf-0.9njs2
yum install -y patchelf
# Install latest pypi release of auditwheel
LD_LIBRARY_PATH="${ORIGINAL_LD_LIBRARY_PATH}:$(dirname ${PY35_BIN})/lib" $PY35_BIN/pip install auditwheel
......
......@@ -87,6 +87,8 @@ function do_cpython_build {
# NOTE Make libpython shared library visible to python calls below
LD_LIBRARY_PATH="${prefix}/lib" ${prefix}/bin/python get-pip.py
LD_LIBRARY_PATH="${prefix}/lib" ${prefix}/bin/pip install wheel
cd /
ls ${MY_DIR}
local abi_tag=$(LD_LIBRARY_PATH="${prefix}/lib" ${prefix}/bin/python ${MY_DIR}/python-tag-abi-tag.py)
ln -s ${prefix} /opt/python/${abi_tag}
}
......
......@@ -131,8 +131,12 @@ class Timeline(object):
if (k, event.device_id, "CPU") not in self._devices:
pid = self._allocate_pid()
self._devices[(k, event.device_id, "CPU")] = pid
self._chrome_trace.emit_pid("%s:cpu:block:%d" %
(k, event.device_id), pid)
# -1 device id represents CUDA api call
if event.device_id == -1:
self._chrome_trace.emit_pid("%s:cuda_api" % k, pid)
else:
self._chrome_trace.emit_pid(
"%s:cpu:block:%d" % (k, event.device_id), pid)
elif event.type == profiler_pb2.Event.GPUKernel:
if (k, event.device_id, "GPUKernel") not in self._devices:
pid = self._allocate_pid()
......@@ -150,7 +154,9 @@ class Timeline(object):
pid = self._devices[(k, event.device_id, type)]
args = {'name': event.name}
if event.memcopy.bytes > 0:
args = {'mem_bytes': event.memcopy.bytes}
args['mem_bytes'] = event.memcopy.bytes
if event.detail_info:
args['detail_info'] = event.detail_info
# TODO(panyx0718): Chrome tracing only handles ms. However, some
# ops takes micro-seconds. Hence, we keep the ns here.
self._chrome_trace.emit_region(
......@@ -173,7 +179,7 @@ if args.timeline_path:
profile_paths = profile_path.split(',')
profile_dict = dict()
if len(profile_paths) == 1:
with open(profile_path, 'r') as f:
with open(profile_path, 'rb') as f:
profile_s = f.read()
profile_pb = profiler_pb2.Profile()
profile_pb.ParseFromString(profile_s)
......@@ -181,7 +187,7 @@ if len(profile_paths) == 1:
else:
for profile_path in profile_paths:
k, v = profile_path.split('=')
with open(v, 'r') as f:
with open(v, 'rb') as f:
profile_s = f.read()
profile_pb = profiler_pb2.Profile()
profile_pb.ParseFromString(profile_s)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册