提交 59d59766 编写于 作者: M Megvii Engine Team

fix(profiler): collect records when thread exit

GitOrigin-RevId: fb796dd61cf66de028e52a4e9dc8ba3484640449
上级 a841d8e7
......@@ -53,10 +53,6 @@ def main():
if args.format:
prof_args["formats"] = args.format
if args.clean:
for file in os.listdir(profiler.directory):
os.remove(os.path.join(profiler.directory, file))
if len(extras) == 0:
if not args.merge_trace_events:
parser.print_usage()
......@@ -74,9 +70,14 @@ def main():
sys.argv[:] = [filename, *extras[1:]]
profiler = Profiler(**prof_args)
if args.clean:
for file in os.listdir(profiler.directory):
os.remove(os.path.join(profiler.directory, file))
with profiler:
if args.module:
runpy.run_module(filename)
run_module(filename)
else:
run_script(filename)
profiler.dump()
......
......@@ -11,10 +11,13 @@
#include "megbrain/imperative.h"
#include "megbrain/imperative/blob_manager.h"
#include "megbrain/imperative/profiler.h"
#include "./event_pool.h"
#include "./async_releaser.h"
#include "./profiler/events.h"
#include <mutex>
namespace mgb {
......@@ -128,9 +131,11 @@ Tensor::Tensor(BlobPtr blob, const TensorLayout& layout, size_t offset, const Ho
Tensor::Tensor(const HostTensorND &hv)
: Tensor(hv.layout(), hv.comp_node()) {
m_value = hv;
MGB_RECORD_EVENT(profiler::HostToDeviceEvent, hv.layout(), hv.comp_node(), hv.raw_ptr(), dev_tensor().raw_ptr());
dev_tensor().copy_from_fixlayout(hv);
// even though hv is saved in m_value, Tensor itself could be
// released before copy completes
MGB_RECORD_EVENT(profiler::HostToDeviceFinishEvent, hv.layout(), hv.comp_node(), hv.raw_ptr(), dev_tensor().raw_ptr());
AsyncReleaser::inst()->add(hv);
}
......
......@@ -37,11 +37,12 @@ std::shared_ptr<CompNode::Event> Timer::record_device(CompNode device) {
return event;
}
std::vector<Profiler::entry_t> Profiler::sm_records;
Profiler::options_t Profiler::sm_profile_options;
std::mutex Profiler::sm_mutex;
std::unordered_map<std::thread::id, Profiler*> Profiler::sm_profilers;
Timer Profiler::sm_timer;
profiler::HostTime Profiler::sm_start_at;
profiler::HostTime Profiler::sm_start_at = profiler::HostTime::min();
std::atomic_uint64_t Profiler::sm_last_id = 0;
bool Profiler::sm_profiling = false;
thread_local std::unique_ptr<Profiler> Profiler::tm_profiler = std::make_unique<Profiler>();
......@@ -50,13 +51,16 @@ std::atomic_size_t Profiler::sm_preferred_capacity;
auto Profiler::get_thread_dict() -> thread_dict_t {
thread_dict_t thread_dict;
for (auto&& [tid, profiler]: sm_profilers) {
thread_dict[tid] = profiler->m_thread_name;
thread_dict[tid] = sys::get_thread_name(tid);
}
return thread_dict;
}
void Profiler::dump_profile(std::string basename, std::string format, bundle_t result) {
std::unordered_map<std::string, void(*)(std::string, bundle_t)> format_table;
static std::unordered_map<std::string, void(*)(std::string, bundle_t)> format_table = {
{"chrome_timeline.json", profiler::dump_chrome_timeline},
{"memory_flow.svg", profiler::dump_memory_flow},
};
auto iter = format_table.find(format);
if (iter == format_table.end()) {
mgb_log_error("unsupported profiling format %s", format.c_str());
......
......@@ -86,10 +86,6 @@ public:
m_args[key] = value;
return *this;
}
ChromeTraceEvent& arg(std::string key, nlohmann::json value) {
m_args[key] = value;
return *this;
}
ChromeTraceEvent& stack(Trace trace) {
m_stack = std::move(trace);
return *this;
......@@ -163,7 +159,7 @@ public:
return m_content.back();
}
std::string metadata(std::string key) {
std::string& metadata(std::string key) {
return m_metadata[key];
}
......@@ -184,8 +180,8 @@ public:
std::string to_string() const {
auto json = to_json();
return "{" "traceEvents:" + nlohmann::to_string(json["traceEvents"]) + ","
"metadata:" + nlohmann::to_string(json["metadata"]) + "}";
return "{" "\"traceEvents\":" + nlohmann::to_string(json["traceEvents"]) + ","
"\"metadata\":" + nlohmann::to_string(json["metadata"]) + "}";
}
private:
std::vector<ChromeTraceEvent> m_content;
......@@ -360,20 +356,40 @@ struct ChromeTimelineEventVisitor: EventVisitor<ChromeTimelineEventVisitor> {
new_host_event("AutoEvict", 'B');
} else if constexpr (std::is_same_v<TEvent, AutoEvictFinishEvent>) {
new_host_event("AutoEvict", 'E');
} else if constexpr (std::is_same_v<TEvent, HostToDeviceEvent>) {
new_device_event("HostToDevice", 'B', event.device);
} else if constexpr (std::is_same_v<TEvent, HostToDeviceFinishEvent>) {
new_device_event("HostToDevice", 'E', event.device)
.arg("shape", event.layout.TensorShape::to_string())
.arg("dtype", event.layout.dtype.name())
.arg("nr_elements", event.layout.total_nr_elems())
.arg("device", event.device.to_string());
}
}
void notify_counter(std::string key, int64_t old_val, int64_t new_val) {
new_host_event(key, 'C').arg("value", new_val);
}
void name_threads(Profiler::thread_dict_t thread_dict) {
for (auto&& [tid, tname]: thread_dict) {
trace_events.new_event()
.name("thread_name")
.pid('M')
.tid(to_tid(tid))
.arg("name", tname);
}
}
};
void dump_chrome_timeline(std::string filename, Profiler::bundle_t result){
ChromeTimelineEventVisitor visitor;
visitor.process_events(result);
visitor.trace_events.metadata("localTime") = std::to_string(result.start_at.time_since_epoch().count());
std::string json_repr = visitor.trace_events.to_string();
visitor.name_threads(result.thread_dict);
auto trace_events = std::move(visitor.trace_events);
trace_events.metadata("localTime") = std::to_string(result.start_at.time_since_epoch().count());
std::string json_repr = trace_events.to_string();
mgb::debug::write_to_file(filename.c_str(), json_repr);
}
......
......@@ -234,7 +234,8 @@ public:
SampleDeviceEvent, WorkerExceptionEvent, ShapeInferEvent, SyncEvent, SyncFinishEvent,
StartProfileEvent, StartProfileFinishEvent, StopProfileEvent, StopProfileFinishEvent,
TensorCommandEvent, TensorCommandFinishEvent, AutoEvictEvent, AutoEvictFinishEvent,
CustomEvent, CustomFinishEvent, RecordDeviceEvent, ScopeEvent, ScopeFinishEvent> converter;
CustomEvent, CustomFinishEvent, RecordDeviceEvent, ScopeEvent, ScopeFinishEvent,
HostToDeviceEvent, HostToDeviceFinishEvent> converter;
auto for_each_entry = [&](auto&& handler) {
for (auto& entry: bundle.entries) {
......@@ -308,6 +309,7 @@ public:
if constexpr (is_op_event<T>::value) {
current_op = &m_operators.at(event.op_id);
} else if constexpr (is_tensor_event<T>::value) {
mgb_assert(m_tensors.count(event.tensor_id) != 0, "tensor not found");
current_tensor = &m_tensors.at(event.tensor_id);
}
if constexpr (std::is_same_v<T, OpExecuteEvent>) {
......
......@@ -31,6 +31,19 @@ ProfilerPlugin::ProfilerPlugin(cg::ComputingGraph* graph): PluginBase(graph) {
if (m_opr_dict.empty() && m_var_dict.empty()) {
init_seq(event.exec);
}
Profiler::record<ScopeEvent>("Constants");
for (auto&& [var, var_info]: m_var_dict) {
if (var_info->is_const) {
bool valid = var->dev_tensor_valid();
auto layout = valid ? var->layout() : TensorLayout();
var_info->id = Profiler::next_id();
Profiler::record<TensorDeclareEvent>(var_info->id, var->name());
Profiler::record<TensorProduceEvent>(var_info->id, layout, var->comp_node(), valid ? var->dev_tensor().raw_ptr() : nullptr);
} else {
var_info->rt_ref_cnt = var_info->ref_cnt;
}
}
Profiler::record<ScopeFinishEvent>("Constants");
Profiler::record<ScopeEvent>("DispatchOprs");
event.exec->iter_opr_seq([this](OperatorNodeBase* opr) -> bool{
auto& opr_info = get_opr_info(opr);
......@@ -44,26 +57,15 @@ ProfilerPlugin::ProfilerPlugin(cg::ComputingGraph* graph): PluginBase(graph) {
}
auto opr_name = opr->dyn_typeinfo()->name;
auto copy_params = [params = opr_info.params] { return *params; };
Profiler::record<OpDispatchEvent>(opr_info.id, opr_name, copy_params, inputs, outputs);
for (auto output: opr->output()) {
auto var_id = get_var_info(output).id;
Profiler::record<TensorDeclareEvent>(var_id);
auto& var_id = get_var_info(output).id;
var_id = Profiler::next_id();
Profiler::record<TensorDeclareEvent>(var_id, output->name());
}
Profiler::record<OpDispatchEvent>(opr_info.id, opr_name, copy_params, inputs, outputs);
return true;
});
Profiler::record<ScopeFinishEvent>("DispatchOprs");
Profiler::record<ScopeEvent>("Constants");
for (auto&& [var, var_info]: m_var_dict) {
if (var_info->is_const) {
bool valid = var->dev_tensor_valid();
auto layout = valid ? var->layout() : TensorLayout();
Profiler::record<TensorDeclareEvent>(var_info->id);
Profiler::record<TensorProduceEvent>(var_info->id, layout, var->comp_node(), valid ? var->dev_tensor().raw_ptr() : nullptr);
} else {
var_info->rt_ref_cnt = var_info->ref_cnt;
}
}
Profiler::record<ScopeFinishEvent>("Constants");
};
auto on_opr_start = [this](OprExecStart const& event) {
OperatorNodeBase* opr = event.opr;
......@@ -144,6 +146,7 @@ ProfilerPlugin::ProfilerPlugin(cg::ComputingGraph* graph): PluginBase(graph) {
Profiler::record<TensorReleaseEvent>(var_info->id);
}
Profiler::record<TensorEraseEvent>(var_info->id, var_info->ref_cnt);
var_info->id = 0;
}
};
add_event_handler(graph->event().register_receiver<CompSeqExecBeforeStart>(on_seq_start));
......@@ -194,11 +197,12 @@ ProfilerPlugin::OprInfo& ProfilerPlugin::register_opr(cg::OperatorNodeBase *opr)
ProfilerPlugin::VarInfo& ProfilerPlugin::register_var(cg::VarNode *var) {
auto info = std::make_unique<VarInfo>();
info->id = Profiler::next_id();
info->id = 0;
info->is_const = false;
info->ref_cnt = 0;
info->rt_ref_cnt = 0;
return *m_var_dict.insert({var, std::move(info)}).first->second;
mgb_assert(m_var_dict.count(var) == 0, "var exists");
return *(m_var_dict[var] = std::move(info));
}
ProfilerPlugin::OprInfo& ProfilerPlugin::get_opr_info(cg::OperatorNodeBase *opr) {
......
......@@ -81,10 +81,9 @@ public:
private:
std::thread::id m_thread_id;
std::vector<Record> m_records;
std::vector<std::any> m_duration_stack;
std::atomic<Status> m_status = Running;
std::string m_thread_name;
static std::vector<entry_t> sm_records;
static options_t sm_profile_options;
static std::mutex sm_mutex;
static std::unordered_map<std::thread::id, Profiler*> sm_profilers;
......@@ -99,9 +98,6 @@ public:
Profiler() {
m_thread_id = std::this_thread::get_id();
MGB_LOCK_GUARD(sm_mutex);
if (sm_profilers.size() == 0) {
reset();
}
mgb_assert(sm_profilers.count(m_thread_id) == 0);
sm_profilers[m_thread_id] = this;
}
......@@ -109,17 +105,13 @@ public:
MGB_LOCK_GUARD(sm_mutex);
mgb_assert(sm_profilers.count(m_thread_id) == 1);
sm_profilers.erase(m_thread_id);
sm_records.insert(sm_records.end(), m_records.begin(), m_records.end());
}
public:
static Profiler& get_instance() {
return *tm_profiler;
}
static void reset() {
mgb_assert(sm_profilers.size() == 0, "profiler already running");
sm_start_at = profiler::HostTime::min();
}
static uint64_t next_id() {
return sm_last_id++;
}
......@@ -151,12 +143,10 @@ public:
mgb_assert(profiler->m_status.compare_exchange_strong(expected, Collecting));
}
}
std::vector<entry_t> profile_data;
std::vector<entry_t> profile_data = std::move(sm_records);
for (auto&& [tid, profiler]: sm_profilers) {
sm_preferred_capacity = std::max(sm_preferred_capacity.load(), profiler->m_records.size());
for (auto& record: profiler->m_records) {
profile_data.push_back(std::move(record));
}
profile_data.insert(profile_data.end(), profiler->m_records.begin(), profiler->m_records.end());
profiler->m_records.clear();
profiler->m_records.reserve(sm_preferred_capacity);
}
......@@ -238,5 +228,10 @@ public:
}
};
#define MGB_RECORD_EVENT(type, ...) \
if (mgb::imperative::Profiler::is_profiling()) { \
mgb::imperative::Profiler::record<type>(type{__VA_ARGS__}); \
} \
} // namespace imperative
} // namespace mgb
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册