未验证 提交 1ba1627d 编写于 作者: L Leo Chen 提交者: GitHub

add timer to log deps in executor (#54188)

* add timer to log deps

* rename flag

* add ut
上级 dff77c23
......@@ -54,6 +54,7 @@ PADDLE_DEFINE_EXPORTED_bool(new_executor_use_local_scope,
PHI_DECLARE_bool(check_nan_inf);
DECLARE_bool(benchmark);
DECLARE_uint64(executor_log_deps_every_microseconds);
PHI_DECLARE_bool(new_executor_use_cuda_graph);
#if defined(PADDLE_WITH_CUDA) || defined(PADDLE_WITH_HIP)
PHI_DECLARE_bool(sync_nccl_allreduce);
......@@ -1055,6 +1056,25 @@ void InterpreterCore::RunInstruction(const Instruction& instr_node) {
}
}
std::string InterpreterCore::GetDepsString() const {
std::stringstream ss;
auto downstream_map = dependency_builder_.OpDownstreamMap();
ss << "Note: when static_dep is 1, it is ok that the dynamic_dep will not "
"be decreased to 0."
<< std::endl;
ss << "unfinished_op_number_:" << unfinished_op_number_ << std::endl;
for (size_t i = 0; i < deps_.size(); ++i) {
ss << "op:" << i << ", type: " << vec_instruction_[i].OpBase()->Type()
<< ", static_dep:" << deps_[i]->StaticDep()
<< ", dynamic_dep:" << deps_[i]->DynamicDep() << ", downstream op: ";
for (auto id : downstream_map[i]) {
ss << id << ", ";
}
ss << std::endl;
}
return ss.str();
}
void InterpreterCore::ExecuteInstructionList(
const std::vector<Instruction>& vec_instr) {
unfinished_op_number_ = vec_instr.size();
......@@ -1078,10 +1098,45 @@ void InterpreterCore::ExecuteInstructionList(
}
}
// For debug hang in main_thread_blocker_.WaitEvent(),
// launch async task to log deps every
// FLAGS_executor_log_deps_every_microseconds, then cancel the std::async when
// main_thread_blocker_.WaitEvent() executed. Why not use std::async instead
// of workqueue? To make sure that the logging thread itself will not affect
// the workqueue
// used in interpretercore.
std::future<int> logged_times;
std::atomic_bool cancel_log = ATOMIC_VAR_INIT(false);
if (FLAGS_executor_log_deps_every_microseconds) {
logged_times = std::async(
std::launch::async,
[this](const std::atomic_bool& cancel) {
int times = 0;
while (!cancel) {
std::this_thread::sleep_for(std::chrono::microseconds(
FLAGS_executor_log_deps_every_microseconds));
// check again, since cancel may be changed during sleep
if (cancel) {
break;
}
VLOG(0) << "deps:\n" << GetDepsString();
times++;
}
return times;
},
std::ref(cancel_log));
}
auto event_name = main_thread_blocker_.WaitEvent();
VLOG(1) << "main_thread_blocker_(" << &main_thread_blocker_
<< ") got event_name: " << event_name;
cancel_log = true;
if (logged_times.valid()) {
VLOG(1) << "Logged deps for " << logged_times.get() << " times";
}
if (UNLIKELY(exception_holder_.IsCaught())) {
VLOG(1) << "Exception caught " << exception_holder_.Type();
// Graceful exit when the executor encountered a fatal error.
......
......@@ -130,6 +130,9 @@ class InterpreterCore {
// scope
bool HasLocalScope() const;
// For log and debug
std::string GetDepsString() const;
private:
bool is_build_{false};
bool static_build_{false};
......
......@@ -345,6 +345,8 @@ class OpDepInfo {
bool CheckAndDecrease() {
return static_dep_ == 1 || (dynamic_dep_.fetch_sub(1) == 1);
}
size_t StaticDep() const { return static_dep_; }
size_t DynamicDep() const { return dynamic_dep_; }
private:
const size_t static_dep_;
......
......@@ -1044,6 +1044,18 @@ PHI_DEFINE_EXPORTED_bool(new_executor_use_cuda_graph,
false,
"Use CUDA Graph in new executor");
/*
* Executor related FLAG
* Name: FLAGS_executor_log_deps_every_microseconds
* Since Version: 2.5
* Value Range: uint64, default=0
* Example: FLAGS_executor_log_deps_every_microseconds=n (n>0) would
* allow new executor log deps every n microseconds.
*/
PHI_DEFINE_EXPORTED_uint64(executor_log_deps_every_microseconds,
0,
"Enable new executor log deps every n microseconds");
DEFINE_int32(record_pool_max_size,
2000000,
"SlotRecordDataset slot record pool max size");
......
......@@ -20,6 +20,10 @@ py_test_modules(
test_standalone_executor_serial_run MODULES test_standalone_executor ENVS
FLAGS_new_executor_serial_run=true)
py_test_modules(
test_standalone_executor_log_deps MODULES test_standalone_executor ENVS
GLOG_v=1 FLAGS_executor_log_deps_every_microseconds=1000)
py_test_modules(
test_standalone_executor_stats MODULES test_standalone_executor ENVS
FLAGS_host_trace_level=10 FLAGS_static_executor_perfstat_filepath=./perfstat)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册