未验证 提交 dfb47986 编写于 作者: L liutiexing 提交者: GitHub

Profile Executors (#41100)

* Profile Executors

* update

* fix ut

* fix names

* update

* update
上级 f87f0656
......@@ -132,6 +132,9 @@ FetchResultType FastThreadedSSAGraphExecutor::Run(
}
// Wait FetchOps.
if (!fetch_ops.empty()) {
platform::RecordEvent record_wait(
"FastThreadedSSAGraphExecutor::WaitFetchOps",
platform::TracerEventType::Operator, 1);
ClearFetchOp(graph_, &fetch_ops);
for (auto &place : places_) {
......@@ -231,8 +234,9 @@ void FastThreadedSSAGraphExecutor::RunOpAsync(
OpHandleBase *op,
const std::shared_ptr<BlockingQueue<size_t>> &complete_q) {
++remaining_;
platform::RecordEvent("WorkQueue::AddTask",
platform::TracerEventType::UserDefined, 10 /*level*/);
platform::RecordEvent record("WorkQueue::AddTask",
platform::TracerEventType::UserDefined,
10 /*level*/);
this->pool_->enqueue([=] {
std::deque<OpHandleBase *> op_queue;
op_queue.push_front(op);
......
......@@ -172,6 +172,8 @@ void Executor::Run(const ProgramDesc& pdesc, Scope* scope, int block_id,
bool create_local_scope, bool create_vars,
const std::vector<std::string>& skip_ref_cnt_vars,
bool force_disable_gc, bool keep_kid_scopes) {
platform::RecordEvent record_run("Executor::Run",
platform::TracerEventType::UserDefined, 1);
platform::RecordBlock b(block_id);
if (FLAGS_use_mkldnn) EnableMKLDNN(pdesc);
auto ctx = Prepare(pdesc, block_id, skip_ref_cnt_vars, force_disable_gc);
......@@ -301,6 +303,8 @@ void Executor::Run(const ProgramDesc& program, Scope* scope,
bool create_local_scope, bool create_vars,
const std::string& feed_holder_name,
const std::string& fetch_holder_name) {
platform::RecordEvent record_run("Executor::Run",
platform::TracerEventType::UserDefined, 1);
platform::RecordBlock b(kProgramId);
if (FLAGS_use_mkldnn) EnableMKLDNN(program);
#ifdef PADDLE_WITH_MKLDNN
......@@ -428,6 +432,8 @@ void Executor::RunPartialPreparedContext(ExecutorPrepareContext* ctx,
int64_t end_op_index,
bool create_local_scope,
bool create_vars, bool keep_kids) {
platform::RecordEvent record_run("Executor::RunPartialPreparedContext",
platform::TracerEventType::UserDefined, 1);
platform::RecordBlock b(kProgramId);
PADDLE_ENFORCE_NOT_NULL(
scope, platform::errors::InvalidArgument("Scope shouldn't be null"));
......@@ -518,6 +524,8 @@ void Executor::RunPartialPreparedContext(ExecutorPrepareContext* ctx,
auto& op = ctx->ops_[i];
op->Run(*local_scope, place_);
if (gc) {
platform::RecordEvent record("CheckGC",
platform::TracerEventType::UserDefined, 10);
DeleteUnusedTensors(*local_scope, op.get(), ctx->unused_vars_, gc.get());
}
}
......
......@@ -44,6 +44,19 @@ double CostData::GetWholeMemoryBytes() const { return whole_memory_bytes_; }
const Graph* CostData::GetGraph() const { return graph_; }
const ProgramDesc* CostData::GetProgram() const { return program_; }
static bool StringHasEnding(const std::string& full,
const std::string& ending) {
if (full.length() < ending.length()) {
return false;
}
if (full.length() == ending.length()) {
return full == ending;
}
size_t prefix_len = full.length() - ending.length();
return 0 == full.compare(prefix_len, ending.length(), ending) &&
full[prefix_len - 1] == '/';
}
bool CostData::SetCostData(const ProgramDesc& program,
const std::vector<std::vector<Event>>& time_events) {
// TODO(zhhsplendid): Make a copy so that CostData can be available even if
......@@ -77,7 +90,7 @@ bool CostData::SetCostData(const ProgramDesc& program,
std::string op_type = op_desc->Type();
while (event_index < main_thread_events.size()) {
if (main_thread_events[event_index].name() == op_type &&
if (StringHasEnding(main_thread_events[event_index].name(), op_type) &&
main_thread_events[event_index].type() ==
platform::EventType::kPushRange) {
break;
......@@ -97,7 +110,7 @@ bool CostData::SetCostData(const ProgramDesc& program,
// ControlFlow Op can be like that, but this version only support global
// block
// TODO(zhhsplendid): make a more strict mapping between push and pop
if (main_thread_events[event_index].name() == op_type &&
if (StringHasEnding(main_thread_events[event_index].name(), op_type) &&
main_thread_events[event_index].type() ==
platform::EventType::kPopRange) {
break;
......
......@@ -13,6 +13,7 @@
// limitations under the License.
#include "paddle/fluid/framework/new_executor/event_manager.h"
#include "paddle/fluid/platform/profiler/event_tracing.h"
namespace paddle {
namespace framework {
......@@ -24,6 +25,8 @@ void WaitEvent(const Instruction& instruction, const platform::Place& place) {
VLOG(3) << "Deal StreamWaitEventOrSync for " << instruction.OpBase()->Type();
for (auto& event_iter : instruction.InputEvents()) {
platform::RecordEvent record("WaitStreamEvent",
platform::TracerEventType::UserDefined, 10);
VLOG(3) << "wait var_id: " << event_iter.var_id_
<< " 's event with waiter_type: " << event_iter.waiter_type_;
event_iter.event_->Wait(event_iter.waiter_type_,
......@@ -36,6 +39,8 @@ void RecordEvent(const Instruction& instruction, const platform::Place& place) {
if (platform::is_cpu_place(place)) return;
for (auto& event : instruction.OutputEvents()) {
platform::RecordEvent record("RecordStreamEvent",
platform::TracerEventType::UserDefined, 10);
VLOG(3) << "Record event in out_var_id: " << event.var_id_;
event.event_->Record(&instruction.DeviceContext());
}
......@@ -46,6 +51,8 @@ void RecordEvent(const Instruction& instruction) {
if (platform::is_cpu_place(instruction.DeviceContext().GetPlace())) return;
for (auto& event : instruction.OutputEvents()) {
platform::RecordEvent record("RecordStreamEvent",
platform::TracerEventType::UserDefined, 10);
VLOG(3) << "Record event in out_var_id: " << event.var_id_;
event.event_->Record(&instruction.DeviceContext());
}
......
......@@ -489,6 +489,8 @@ void InterpreterCore::RunInstruction(const Instruction& instr_node) {
VLOG(4) << "End run " << place << " " << op->DebugStringEx(global_scope_);
if (!instr_node.InplaceBackMap().empty()) {
platform::RecordEvent inplaceback_event(
"InplaceVarsBack", platform::TracerEventType::UserDefined, 10);
auto& m = instr_node.InplaceBackMap();
// NOTE(zhiqiu): same logic as TransferInplaceVarsBack() in operator.cc
for (auto& p : m) {
......@@ -530,6 +532,8 @@ void InterpreterCore::ExecuteInstructionList(
return;
}
platform::RecordEvent record_prepare(
"PrepareAtomic", platform::TracerEventType::UserDefined, 1);
// NOTE(zhiqiu): get the prepared deps from std::future, and async prepare
// those for the next step
auto atomic_deps = async_work_queue_->AtomicDeps();
......@@ -537,6 +541,7 @@ void InterpreterCore::ExecuteInstructionList(
async_work_queue_->PrepareAtomicDeps(dependecy_count_);
async_work_queue_->PrepareAtomicVarRef(global_scope_->VecMetaInfo());
record_prepare.End();
exception_holder_.Clear();
......@@ -573,6 +578,9 @@ void InterpreterCore::RunNextInstructions(
const Instruction& instr, std::queue<size_t>* reserved_next_ops,
std::vector<std::atomic<size_t>>* atomic_deps,
std::vector<std::atomic<size_t>>* atomic_var_ref) {
platform::RecordEvent record("RunNextInstructions",
platform::TracerEventType::UserDefined, 10);
VLOG(4) << "atomic 1:" << atomic_deps;
auto& next_instr = instr.NextInstructions();
auto IsReady = [atomic_deps](size_t next_id) {
......@@ -708,6 +716,8 @@ void InterpreterCore::RecordStreamForGC(const Instruction& instr) {
instr.KernelType() != OpFuncType::kQueueAsync) {
return;
}
platform::RecordEvent record("RecordStreamForGC",
platform::TracerEventType::UserDefined, 10);
gpuStream_t stream = reinterpret_cast<const platform::CUDADeviceContext&>(
instr.DeviceContext())
......@@ -799,6 +809,8 @@ void InterpreterCore::RecordStreamForGC(const Instruction& instr) {
void InterpreterCore::CheckGC(
const Instruction& instr,
std::vector<std::atomic<size_t>>* atomic_var_ref) {
platform::RecordEvent record("CheckGC",
platform::TracerEventType::UserDefined, 10);
size_t instr_id = instr.Id();
auto& var_scope = *global_scope_;
......
......@@ -408,7 +408,7 @@ class ThreadPoolTempl {
ec_.Notify(true);
return false;
}
platform::RecordEvent("SleepWaitForWork",
platform::RecordEvent record("WaitForWork",
platform::TracerEventType::UserDefined, 10);
ec_.CommitWait(waiter);
blocked_--;
......
......@@ -55,8 +55,9 @@ class WorkQueueImpl : public WorkQueue {
}
void AddTask(std::function<void()> fn) override {
platform::RecordEvent("WorkQueue::AddTask",
platform::TracerEventType::UserDefined, 10 /*level*/);
platform::RecordEvent record("WorkQueue::AddTask",
platform::TracerEventType::UserDefined,
10 /*level*/);
if (tracker_ != nullptr) {
fn = [
task = std::move(fn), raii = CounterGuard<TaskTracker>(tracker_)
......@@ -146,8 +147,9 @@ WorkQueueGroupImpl::~WorkQueueGroupImpl() {
}
void WorkQueueGroupImpl::AddTask(size_t queue_idx, std::function<void()> fn) {
platform::RecordEvent("WorkQueue::AddTask",
platform::TracerEventType::UserDefined, 10 /*level*/);
platform::RecordEvent record("WorkQueue::AddTask",
platform::TracerEventType::UserDefined,
10 /*level*/);
assert(queue_idx < queues_.size());
if (queues_options_.at(queue_idx).track_task) {
fn = [
......
......@@ -916,6 +916,8 @@ void ParallelExecutor::BCastParamsToDevices(
FetchResultType ParallelExecutor::Run(
const std::vector<std::string> &fetch_tensors, bool return_merged) {
platform::RecordEvent record_run("ParallelExecutor::Run",
platform::TracerEventType::UserDefined, 1);
VLOG(3) << "enter ParallelExecutor Run";
#ifdef PADDLE_WITH_CUDA
if (platform::IsCUDAGraphCapturing()) {
......
......@@ -48,8 +48,9 @@ AutoGrowthBestFitAllocator::AutoGrowthBestFitAllocator(
phi::Allocation *AutoGrowthBestFitAllocator::AllocateImpl(
size_t unaligned_size) {
platform::RecordEvent("AutoGrowthBestFitAllocator::Allocate",
platform::TracerEventType::UserDefined, 9 /*level*/);
platform::RecordEvent record("AutoGrowthBestFitAllocator::Allocate",
platform::TracerEventType::UserDefined,
9 /*level*/);
size_t size = AlignedSize(unaligned_size, alignment_);
VLOG(10) << "Allocate " << unaligned_size << " bytes, aligned to " << size;
......@@ -111,8 +112,9 @@ phi::Allocation *AutoGrowthBestFitAllocator::AllocateImpl(
}
void AutoGrowthBestFitAllocator::FreeImpl(phi::Allocation *allocation) {
platform::RecordEvent("AutoGrowthBestFitAllocator::Free",
platform::TracerEventType::UserDefined, 9 /*level*/);
platform::RecordEvent record("AutoGrowthBestFitAllocator::Free",
platform::TracerEventType::UserDefined,
9 /*level*/);
VLOG(10) << "Free " << allocation->size()
<< " bytes, ptr = " << allocation->ptr();
std::lock_guard<SpinLock> guard(spinlock_);
......
......@@ -163,8 +163,9 @@ void StreamSafeCUDAAllocator::SetDefaultStream(gpuStream_t stream) {
}
phi::Allocation* StreamSafeCUDAAllocator::AllocateImpl(size_t size) {
platform::RecordEvent("StreamSafeCUDAAllocator::Allocate",
platform::TracerEventType::UserDefined, 9 /*level*/);
platform::RecordEvent record("StreamSafeCUDAAllocator::Allocate",
platform::TracerEventType::UserDefined,
9 /*level*/);
ProcessUnfreedAllocations();
VLOG(8) << "Try allocate " << size << " bytes";
AllocationPtr underlying_allocation;
......@@ -192,8 +193,9 @@ phi::Allocation* StreamSafeCUDAAllocator::AllocateImpl(size_t size) {
}
void StreamSafeCUDAAllocator::FreeImpl(phi::Allocation* allocation) {
platform::RecordEvent("StreamSafeCUDAAllocator::Free",
platform::TracerEventType::UserDefined, 9 /*level*/);
platform::RecordEvent record("StreamSafeCUDAAllocator::Free",
platform::TracerEventType::UserDefined,
9 /*level*/);
StreamSafeCUDAAllocation* stream_safe_cuda_allocation =
static_cast<StreamSafeCUDAAllocation*>(allocation);
......
......@@ -2867,7 +2867,7 @@ All parameter, weight, gradient are variables in Paddle.
[](StandaloneExecutor &self, std::vector<std::string> feed_names,
std::vector<std::string> fetch_names) {
platform::RecordEvent record_event(
"StandaloneExecutor:run",
"StandaloneExecutor::run",
platform::TracerEventType::UserDefined, 1);
paddle::framework::FetchList ret;
{
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册