提交 1d64792b 编写于 作者: M Megvii Engine Team 提交者: huangxinda

refactor(profiler): detach profiler from interpreter

GitOrigin-RevId: f3954728d1dd8e93e2eb5a94ee5f3a030a54fb5a
上级 f2027b8d
......@@ -7,9 +7,14 @@
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
import json
from contextlib import contextmanager
import os
import re
from contextlib import ContextDecorator, contextmanager
from functools import wraps
from typing import List
from weakref import WeakSet
from .. import _atexit
from ..core._imperative_rt.core2 import (
pop_scope,
push_scope,
......@@ -17,9 +22,13 @@ from ..core._imperative_rt.core2 import (
stop_profile,
sync,
)
from ..logger import get_logger
_running_profiler = None
_living_profilers = WeakSet()
class Profiler:
class Profiler(ContextDecorator):
r"""
Profile graph execution in imperative mode.
......@@ -35,9 +44,10 @@ class Profiler:
from megengine.utils.profiler import Profiler
# With Learnable Parameters
profiler = Profiler()
for iter in range(0, 10):
# Only profile record of last iter would be saved
with Profiler("profile"):
with profiler:
# your code here
# Then open the profile file in chrome timeline window
......@@ -45,46 +55,105 @@ class Profiler:
CHROME_TIMELINE = "chrome_timeline.json"
COMMAND = 1 << 0
OPERATOR = 1 << 1
TENSOR_LIFETIME = 1 << 2
TENSOR_PROP = 1 << 3
SYNC = 1 << 4
SCOPE = 1 << 5
ALL = (1 << 6) - 1
valid_options = {"sample_rate": 0, "profile_device": 1, "num_tensor_watch": 10}
valid_formats = {"chrome_timeline.json", "memory_flow.svg"}
def __init__(
self,
path: str = "profile",
format: str = CHROME_TIMELINE,
*,
topic=OPERATOR | SCOPE,
align_time=True,
show_operator_name=True
format: str = "chrome_timeline.json",
formats: List[str] = None,
**kwargs
) -> None:
self._path = path
self._format = format
self._options = {
"topic": int(topic),
"align_time": int(align_time),
"show_operator_name": int(show_operator_name),
}
if not formats:
formats = [format]
def __enter__(self):
assert not isinstance(formats, str), "formats excepts list, got str"
for format in formats:
assert format in Profiler.valid_formats, "unsupported format {}".format(
format
)
self._path = path
self._formats = formats
self._options = {}
for opt, optval in Profiler.valid_options.items():
self._options[opt] = int(kwargs.pop(opt, optval))
self._pid = "<PID>"
@property
def path(self):
if len(self._formats) == 0:
format = "<FORMAT>"
elif len(self._formats) == 1:
format = self._formats[0]
else:
format = "{" + ",".join(self._formats) + "}"
return self.format_path(self._path, self._pid, format)
@property
def directory(self):
return self._path
@property
def formats(self):
return list(self._formats)
def start(self):
global _running_profiler
assert _running_profiler is None
_running_profiler = self
self._pid = os.getpid()
start_profile(self._options)
return self
def __exit__(self, val, tp, trace):
stop_profile(self._path, self._format)
# dump is async, so it's necessary to sync interpreter
def stop(self):
global _running_profiler
assert _running_profiler is self
_running_profiler = None
sync()
self._dump_callback = stop_profile()
self._pid = os.getpid()
_living_profilers.add(self)
def dump(self):
if self._dump_callback is not None:
if not os.path.exists(self._path):
os.makedirs(self._path)
if not os.path.isdir(self._path):
get_logger().warning(
"{} is not a directory, cannot write profiling results".format(
self._path
)
)
return
for format in self._formats:
path = self.format_path(self._path, self._pid, format)
get_logger().info("process {} generating {}".format(self._pid, format))
self._dump_callback(path, format)
get_logger().info("profiling results written to {}".format(path))
self._dump_callback = None
_living_profilers.remove(self)
def format_path(self, path, pid, format):
return os.path.join(path, "{}.{}".format(pid, format))
def __enter__(self):
self.start()
def __exit__(self, val, tp, trace):
self.stop()
def __call__(self, func):
def wrapper(*args, **kwargs):
with self:
return func(*args, **kwargs)
func = super().__call__(func)
func.__profiler__ = self
return func
return wrapper
def __del__(self):
self.dump()
@contextmanager
......@@ -94,16 +163,77 @@ def scope(name):
pop_scope(name)
profile = Profiler
def profile(*args, **kwargs):
if len(args) == 1 and len(kwargs) == 0 and callable(args[0]):
return Profiler()(args[0])
return Profiler(*args, **kwargs)
def merge_trace_events(directory: str):
names = filter(
lambda x: re.match(r"\d+\.chrome_timeline\.json", x), os.listdir(directory)
)
def load_trace_events(name):
with open(os.path.join(directory, name), "r", encoding="utf-8") as f:
return json.load(f)
def find_metadata(content):
if isinstance(content, dict):
assert "traceEvents" in content
content = content["traceEvents"]
if len(content) == 0:
return None
assert content[0]["name"] == "Metadata"
return content[0]["args"]
contents = list(map(load_trace_events, names))
metadata_list = list(map(find_metadata, contents))
min_local_time = min(
map(lambda x: x["localTime"], filter(lambda x: x is not None, metadata_list))
)
events = []
for content, metadata in zip(contents, metadata_list):
local_events = content["traceEvents"]
if len(local_events) == 0:
continue
local_time = metadata["localTime"]
time_shift = local_time - min_local_time
for event in local_events:
if "ts" in event:
event["ts"] = int(event["ts"] + time_shift)
events.extend(filter(lambda x: x["name"] != "Metadata", local_events))
result = {
"traceEvents": events,
}
path = os.path.join(directory, "merge.chrome_timeline.json")
with open(path, "w") as f:
json.dump(result, f, ensure_ascii=False, separators=(",", ":"))
get_logger().info("profiling results written to {}".format(path))
def is_profiling():
return _running_profiler is not None
def _stop_current_profiler():
global _running_profiler
if _running_profiler is not None:
_running_profiler.stop()
living_profilers = [*_living_profilers]
for profiler in living_profilers:
profiler.dump()
def merge_trace_events(sources: List[str], target: str):
names = list(map(lambda x: x + ".chrome_timeline.json", sources))
result = []
for name in names:
with open(name, "r", encoding="utf-8") as f:
content = json.load(f)
for entry in content:
result.append(entry)
with open(target + ".chrome_timeline.json", "w") as f:
json.dump(result, f, ensure_ascii=False, indent=4)
_atexit(_stop_current_profiler)
......@@ -13,6 +13,7 @@
#include "megbrain/common.h"
#include "megbrain/imperative/ops/utility.h"
#include "megbrain/imperative/ops/backward_graph.h"
#include "megbrain/imperative/profiler.h"
#include "megbrain/opr/io.h"
#include "./tensor.h"
......@@ -927,9 +928,23 @@ void init_tensor(py::module m) {
m.def("pop_scope",
[](std::string name) { interpreter_for_py->pop_scope(name); });
m.def("start_profile",
[](std::unordered_map<std::string, int> option) { return interpreter_for_py->start_profile(option); });
[](imperative::Profiler::options_t options) {
interpreter_for_py->sync();
imperative::Profiler::load_options(std::move(options));
imperative::Profiler::start_profile();
interpreter_for_py->start_profile();
});
m.def("stop_profile",
[](std::string basename, std::string format) { interpreter_for_py->stop_profile(basename, format); });
[]() -> std::function<void(std::string, std::string)> {
interpreter_for_py->stop_profile();
interpreter_for_py->sync();
imperative::Profiler::stop_profile();
auto results = imperative::Profiler::collect();
auto options = imperative::Profiler::get_options();
return [results=std::move(results), options=std::move(options)](std::string basename, std::string format){
imperative::Profiler::dump_profile(basename, format, results, options);
};
});
m.def("sync",
[]() {
interpreter_for_py->sync();
......
......@@ -8,6 +8,7 @@
# "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied
import json
import os
import tempfile
import pytest
......@@ -28,15 +29,18 @@ class Simple(Module):
def test_profiler():
profile_prefix = "pytest_profile"
tempdir = tempfile.NamedTemporaryFile()
profile_prefix = tempdir.name
profile_format = "chrome_timeline.json"
profile_path = "{}.{}".format(profile_prefix, profile_format)
profile_path = os.path.join(
profile_prefix, "{}.{}".format(os.getpid(), profile_format)
)
with option("enable_host_compute", 0):
with Profiler(profile_prefix, format=profile_format):
with scope("my_scope"):
oup = Simple()(tensor([1.23], dtype="float32"))
with open(profile_path, "r") as f:
events = json.load(f)
os.remove(profile_path)
prev_ts = {}
scope_count = 0
for event in events:
......
......@@ -13,11 +13,14 @@
#include <string>
#include <variant>
#include <unordered_set>
#include "megbrain/tensor.h"
#include "megbrain/imperative/op_def.h"
#include "megbrain/imperative/utils/to_string.h"
#include "./tensor_info.h"
namespace mgb::imperative {
namespace interpreter::intl {
......@@ -43,7 +46,7 @@ struct Put {
};
struct ApplyOp {
uint64_t id;
uint64_t id; //used by profiler to identify unique apply
std::shared_ptr<OpDef> op;
SmallVector<TensorInfo*> inputs;
SmallVector<TensorInfo*> outputs;
......@@ -143,7 +146,7 @@ struct SetOption {
};
struct StartProfile {
InterpreterProfiler* profiler;
std::unordered_set<TensorInfo*> capture_tensors;
template <typename TFunctor>
void get_props(TFunctor&& functor) const {}
......@@ -154,14 +157,10 @@ struct StartProfile {
};
struct StopProfile {
std::string basename;
std::string format;
std::unordered_set<TensorInfo*> escape_tensors;
template <typename TFunctor>
void get_props(TFunctor&& functor) const {
functor("basename", basename);
functor("format", format);
}
void get_props(TFunctor&& functor) const {}
const char* get_name() const {
return "StopProfile";
......
......@@ -20,19 +20,17 @@
#include "megbrain/imperative/ops/opr_attr.h"
#include "megbrain/imperative/utils/to_string.h"
#include "../event_pool.h"
#include "../op_trait.h"
using namespace mgb;
using namespace imperative;
using namespace interpreter;
using namespace interpreter::intl;
#define RECORD_EVENT(type, ...) \
if (state.profiler->is_profiling()) { \
state.profiler->record_host<type>(type{__VA_ARGS__}); \
} \
#define RECORD_DEVICE_EVENT(type, device, ...) \
if (state.profiler->is_profiling()) { \
state.profiler->record_device<type>((device), type{__VA_ARGS__}); \
if (Profiler::is_profiling()) { \
Profiler::record<type>(type{__VA_ARGS__}); \
} \
......@@ -46,6 +44,10 @@ namespace {
};
}
namespace mgb {
using namespace profiler;
}
std::thread::id ChannelImpl::get_worker_tid() {
return m_worker_state.tid;
}
......@@ -60,6 +62,7 @@ ChannelImpl::WorkerState& ChannelImpl::get_worker_state() {
return m_worker_state;
}
// Do not use m_xxx_state directly
#define m_channel_state
#define m_worker_state
......@@ -74,10 +77,16 @@ Interpreter& Interpreter::inst() {
Handle ChannelImpl::put(const HostTensorND& value, bool no_cache) {
mgb_assert(check_available(), "Channel already closed");
auto& state = get_channel_state();
state.scopes.push("Put");
auto info = put_impl(value, no_cache);
state.scopes.pop("Put");
return info;
}
TensorInfo* ChannelImpl::put_impl(const HostTensorND& value, bool no_cache) {
auto info = alloc();
info->desc.layout = value.layout();
info->desc.comp_node = value.comp_node();
info->desc.value = value.proxy_to_default_cpu();
init(info, {value.layout(), value.comp_node(), value.proxy_to_default_cpu()});
info->h_value = value;
m_buffer.enqueue(Put{info, value, no_cache});
if (m_async_level == 0) {
......@@ -90,11 +99,15 @@ Handle ChannelImpl::put(const HostTensorND& value, bool no_cache) {
Handle ChannelImpl::put(const DeviceTensorND& data) {
auto& state = get_channel_state();
mgb_assert(check_available(), "Channel already closed");
state.scopes.push("Put");
auto info = alloc();
info->desc.layout = data.layout();
info->desc.comp_node = data.comp_node();
RECORD_EVENT(TensorCommandEvent, info->id, TensorCommandEvent::Put);
init(info, {data.layout(), data.comp_node()});
info->ptr = Tensor::make(data);
RECORD_EVENT(TensorProduceEvent, info->id, info->desc.layout, info->desc.comp_node);
RECORD_EVENT(TensorProduceEvent, info->id, info->desc.layout, info->desc.comp_node, data.raw_ptr());
info->status = TensorInfo::Produced;
RECORD_EVENT(TensorCommandFinishEvent, info->id, TensorCommandFinishEvent::Put);
state.scopes.pop("Put");
return info;
}
......@@ -148,7 +161,7 @@ void ChannelImpl::dispatch_default_cpu(
SmallVector<Handle>* outputs) {
auto& state = get_channel_state();
auto [output_descs, validated] = OpDef::infer_output_attrs_fallible(*op, input_descs);
MGB_MARK_USED_VAR(validated);
RECORD_EVENT(ShapeInferEvent, validated);
SmallVector<DeviceTensorND> input_tensornds;
input_tensornds.reserve(input_descs.size());
......@@ -166,6 +179,7 @@ void ChannelImpl::dispatch_default_cpu(
if (info->ptr && info->ptr->try_get_value()) {
input_tensornds.emplace_back(info->ptr->get_value().proxy_to_default_cpu());
} else {
// It's OK for SwapOut. We assign h_value before drop ptr
mgb_assert(!info->h_value.empty(), "inp->h_value is empty!");
input_tensornds.emplace_back(info->h_value.proxy_to_default_cpu());
}
......@@ -182,8 +196,7 @@ void ChannelImpl::dispatch_default_cpu(
output_tensornds.emplace_back(HostTensorND(output_cn, desc.layout).proxy_to_default_cpu());
}
auto apply_id = ++m_last_id;
RECORD_EVENT(OpExecuteEvent, apply_id, op, tinfo_to_tid(input_infos), {});
uint64_t op_id = Profiler::next_id();
OpDef::apply_on_device_tensornd(*op, input_tensornds, &output_tensornds);
......@@ -193,14 +206,20 @@ void ChannelImpl::dispatch_default_cpu(
HostTensorND host_tensornd = HostTensorND::make_proxy(tensornd)
.proxy_to_comp_node(output_cn);
// use `put` for consistency
auto info = reinterpret_cast<TensorInfo*>(put(host_tensornd, false));
auto info = reinterpret_cast<TensorInfo*>(put_impl(host_tensornd, false));
mgb_assert(info->desc.layout.ndim != 0);
output_infos.push_back(info);
outputs->push_back(info);
}
RECORD_EVENT(OpExecuteFinishEvent, apply_id, op,
tinfo_to_tid(input_infos), tinfo_to_tid(output_infos));
auto op_info_getter = [op]{
std::unordered_map<std::string, std::string> op_info;
auto props = OpDef::props(*op);
for (auto&& [key, value]: props) {
op_info[key] = value;
}
return op_info;
};
RECORD_EVENT(OpDispatchEvent, op_id, op->trait()->name, op_info_getter, tinfo_to_tid(input_infos), tinfo_to_tid(output_infos));
}
void ChannelImpl::dispatch_kernel(
......@@ -209,15 +228,22 @@ void ChannelImpl::dispatch_kernel(
const SmallVector<LogicalTensorDesc>& input_descs,
SmallVector<Handle>* outputs) {
auto& state = get_channel_state();
auto& options = state.options;
auto name = op->trait()->make_name(*op);
state.scopes.push(name);
auto [output_descs, validated] = OpDef::infer_output_attrs_fallible(*op, input_descs);
RECORD_EVENT(ShapeInferEvent, validated);
ApplyOp cmd{++m_last_id, std::move(op)};
ApplyOp cmd{Profiler::next_id(), std::move(op)};
cmd.inputs = std::move(input_infos);
cmd.outputs.reserve(output_descs.size());
outputs->reserve(output_descs.size());
for (auto&& desc : output_descs) {
for (int i = 0; i < output_descs.size(); ++i) {
auto&& desc = output_descs[i];
auto info = alloc();
info->desc = desc;
init(info, desc);
// make sure desc's value is consistent with h_value
if (!info->desc.value.empty()) {
info->h_value = HostTensorND::make_proxy(desc.value)
......@@ -226,10 +252,19 @@ void ChannelImpl::dispatch_kernel(
cmd.outputs.push_back(info);
outputs->push_back(info);
}
auto op_info_getter = [op=cmd.op]{
std::unordered_map<std::string, std::string> op_info;
auto props = OpDef::props(*op);
for (auto&& [key, value]: props) {
op_info[key] = value;
}
return op_info;
};
RECORD_EVENT(OpDispatchEvent, cmd.id, cmd.op->trait()->name, op_info_getter, tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs));
m_buffer.enqueue(std::move(cmd));
if (!validated && state.options.async_level == 1) {
if (!validated && options.async_level == 1) {
sync();
} else if (state.options.async_level == 0) {
} else if (options.async_level == 0) {
sync();
// check device error
for (auto&& oup : *outputs) {
......@@ -237,6 +272,7 @@ void ChannelImpl::dispatch_kernel(
info->ptr->comp_node().sync();
}
}
state.scopes.pop(name);
}
SmallVector<Handle> ChannelImpl::apply_op(
......@@ -282,31 +318,12 @@ SmallVector<Handle> ChannelImpl::apply_op(
HostTensorND ChannelImpl::get_value(Handle handle) {
mgb_assert(check_available(), "Channel already closed");
auto& state = get_channel_state();
// TODO: maybe get_value should be done on host. i.e. delete GetValue
mgb_assert(m_valid_handle.find(handle) != m_valid_handle.end(),
"invalid handle: %p", handle);
auto info = reinterpret_cast<TensorInfo*>(handle);
mgb_assert(!m_waitee);
// donnot use info->value_fetched, it's unsafe
mgb_assert(!info->invalid, "Invalid tensor, unable to get_value!");
std::unique_lock<decltype(m_mutex)> lock(m_mutex);
TensorPtr tensor_ptr = info->ptr;
auto value_fetched = [&]() {
return tensor_ptr && tensor_ptr->value_fetched();
};
if (!value_fetched()) {
m_waitee = info;
m_buffer.enqueue(GetValue{info});
RECORD_EVENT(TensorWaitPropEvent, info->id, TensorInfo::HostValue);
m_cv.wait(lock, [&]() {
check_worker_exc_unsafe();
tensor_ptr = info->ptr;
return value_fetched();
});
RECORD_EVENT(TensorWaitPropFinishEvent, info->id, TensorInfo::HostValue);
m_waitee = nullptr;
}
return tensor_ptr->get_value();
return wait_tensor(info, TensorProp::HostValue)->get_value();
}
TensorShape ChannelImpl::get_shape(Handle handle) {
......@@ -318,18 +335,7 @@ TensorShape ChannelImpl::get_shape(Handle handle) {
if (info->desc.layout.ndim != 0) {
return info->desc.layout;
}
std::unique_lock<decltype(m_mutex)> lock(m_mutex);
mgb_assert(!m_waitee);
m_waitee = info;
m_buffer.flush();
RECORD_EVENT(TensorWaitPropEvent, info->id, TensorInfo::Shape);
m_cv.wait(lock, [&]() {
check_worker_exc_unsafe();
return static_cast<bool>(info->ptr);
});
RECORD_EVENT(TensorWaitPropFinishEvent, info->id, TensorInfo::Shape);
m_waitee = nullptr;
TensorShape ret = info->ptr->layout();
TensorShape ret = wait_tensor(info, TensorProp::Shape)->layout();
mgb_assert(ret.ndim != 0);
return ret;
}
......@@ -340,7 +346,7 @@ DType ChannelImpl::get_dtype(Handle handle) {
mgb_assert(m_valid_handle.find(handle) != m_valid_handle.end(),
"invalid handle: %p", handle);
auto info = reinterpret_cast<TensorInfo*>(handle);
RECORD_EVENT(TensorGetPropEvent, info->id, TensorInfo::DType);
RECORD_EVENT(TensorGetPropEvent, info->id, TensorProp::DType);
auto ret = info->desc.layout.dtype;
mgb_assert(ret.valid());
return ret;
......@@ -352,7 +358,7 @@ CompNode ChannelImpl::get_device(Handle handle) {
mgb_assert(m_valid_handle.find(handle) != m_valid_handle.end(),
"invalid handle: %p", handle);
auto info = reinterpret_cast<TensorInfo*>(handle);
RECORD_EVENT(TensorGetPropEvent, info->id, TensorInfo::Device);
RECORD_EVENT(TensorGetPropEvent, info->id, TensorProp::Device);
auto ret = info->desc.comp_node;
mgb_assert(ret.valid());
return ret;
......@@ -364,28 +370,14 @@ DeviceTensorND ChannelImpl::get_dev_tensor(Handle handle) {
mgb_assert(m_valid_handle.find(handle) != m_valid_handle.end(),
"invalid handle: %p", handle);
auto info = reinterpret_cast<TensorInfo*>(handle);
std::unique_lock<decltype(m_mutex)> lock(m_mutex);
mgb_assert(!m_waitee);
m_waitee = info;
m_buffer.flush();
RECORD_EVENT(TensorWaitPropEvent, info->id, TensorInfo::DevValue);
m_cv.wait(lock, [&]() {
check_worker_exc_unsafe();
return static_cast<bool>(info->ptr);
});
RECORD_EVENT(TensorWaitPropFinishEvent, info->id, TensorInfo::DevValue);
m_waitee = nullptr;
return info->ptr->dev_tensor();
return wait_tensor(info, TensorProp::DevValue)->dev_tensor();
}
void ChannelImpl::sync() {
mgb_assert(check_available(), "Channel already closed");
auto& state = get_channel_state();
m_buffer.flush();
RECORD_EVENT(SyncEvent);
m_worker.wait_all_task_finish();
CompNode::sync_all();
RECORD_EVENT(SyncFinishEvent);
MGB_LOCK_GUARD(m_mutex);
check_worker_exc_unsafe();
}
......@@ -419,14 +411,24 @@ void ChannelImpl::set_option(std::string name, size_t value) {
TensorInfo* ChannelImpl::alloc() {
auto& state = get_channel_state();
auto info = [this]{
MGB_LOCK_GUARD(m_mutex);
auto info = m_pool.alloc();
m_valid_handle.insert(info);
info->id = m_last_id++;
RECORD_EVENT(TensorDeclareEvent, info->id);
return m_pool.alloc();
}();
info->id = Profiler::next_id();
if (Profiler::is_profiling()) {
info->name = state.scopes.next_tensor_name();
}
return info;
}
void ChannelImpl::init(TensorInfo* info, LogicalTensorDesc desc) {
m_valid_handle.insert(info);
RECORD_EVENT(TensorDeclareEvent, info->id, info->name);
info->status = TensorInfo::Allocated;
info->desc = std::move(desc);
}
void ChannelImpl::do_drop(TensorInfo* ptr, bool user=false) {
if (!ptr->producer) {
......@@ -439,6 +441,7 @@ void ChannelImpl::do_drop(TensorInfo* ptr, bool user=false) {
return;
}
ptr->evict_type = EvictType::DROP;
ptr->status = TensorInfo::Dropped;
release_tensor(ptr);
}
......@@ -460,7 +463,8 @@ void ChannelImpl::free(TensorInfo* ptr) {
}
void ChannelImpl::recursive_free(TensorInfo* ptr) {
SmallVector<TensorInfo*> inps(0);
RECORD_EVENT(TensorCommandEvent, ptr->id, TensorCommandEvent::RecFree);
SmallVector<TensorInfo*> inps;
if (ptr->producer) {
for (auto i : ptr->producer->inputs) {
if (i && --i->ref_cnt == 0) {
......@@ -474,17 +478,23 @@ void ChannelImpl::recursive_free(TensorInfo* ptr) {
recursive_free(i);
}
}
RECORD_EVENT(TensorCommandFinishEvent, ptr->id, TensorCommandFinishEvent::RecFree);
}
void ChannelImpl::real_free(TensorInfo* ptr) {
auto& state = get_worker_state();
MGB_LOCK_GUARD(m_mutex);
RECORD_EVENT(TensorEraseEvent, ptr->id);
if (ptr->size_exceeds_thd(state.options.dtr_evictee_minimum_size)) {
m_dtr.erase_candidate(ptr);
}
detach_users(ptr);
ptr->detach_producer();
bool has_value = ptr->ptr != nullptr;
if (has_value) {
RECORD_EVENT(TensorReleaseEvent, ptr->id);
}
RECORD_EVENT(TensorEraseEvent, ptr->id, ptr->ptr_use_count);
ptr->status = TensorInfo::Deleted;
m_pool.free(ptr);
}
......@@ -496,46 +506,48 @@ ChannelImpl::~ChannelImpl() {
void ChannelImpl::produce_tensor(TensorInfo* dest, TensorPtr ptr, bool notice=true) {
auto& state = get_worker_state();
auto lock = std::unique_lock<std::mutex>(m_mutex, std::defer_lock);
std::unique_lock<std::mutex> lock{m_mutex, std::defer_lock};
if (notice) {
lock.lock();
}
m_dtr.update_used_time(dest);
if (notice) {
RECORD_EVENT(TensorProduceEvent, dest->id, ptr->layout(), ptr->comp_node());
}
dest->value_fetched = ptr->value_fetched();
RECORD_EVENT(TensorProduceEvent, dest->id, ptr->layout(), ptr->comp_node(), ptr->dev_tensor().raw_ptr());
// update tensor desc for static infer
dest->desc.layout = ptr->layout();
dest->desc.comp_node = ptr->comp_node();
dest->memory = ptr->blob()->size();
dest->ptr = std::move(ptr);
dest->evict_type = EvictType::NONE;
dest->status = TensorInfo::Produced;
if (notice && dest->size_exceeds_thd(state.options.dtr_evictee_minimum_size)) {
m_dtr.insert_candidate(dest);
}
if (notice && m_waitee == dest) {
m_cv.notify_all();
if (notice) {
notify_tensor_unsafe(dest);
}
}
void ChannelImpl::release_tensor(TensorInfo* dest) {
RECORD_EVENT(TensorReleaseEvent, dest->id);
MGB_LOCK_GUARD(m_mutex);
dest->ptr.reset();
}
void ChannelImpl::regenerate(TensorInfo* dest) {
RECORD_EVENT(TensorCommandEvent, dest->id, TensorCommandEvent::ReGen);
if (dest->evict_type == EvictType::DROP) {
recompute(dest->producer);
} else if (dest->evict_type == EvictType::SWAP) {
produce_tensor(dest, Tensor::make(dest->h_value));
}
RECORD_EVENT(TensorCommandFinishEvent, dest->id, TensorCommandFinishEvent::ReGen);
}
void ChannelImpl::do_apply_op(const ApplyOp& cmd) {
using namespace ranges;
using namespace ranges::views;
auto& state = get_worker_state();
bool profiling_device = Profiler::is_profiling() && Profiler::get_option("profile_device", 0);
uint64_t apply_id = cmd.id;
SmallVector<TensorPtr> tensor_inputs;
if (state.options.enable_dtr_auto_drop) {
......@@ -545,33 +557,50 @@ void ChannelImpl::do_apply_op(const ApplyOp& cmd) {
if (!i->ptr && i->evict_type != EvictType::NONE) {
regenerate(i);
}
// inputs.push_back(i->ptr);
m_dtr.update_used_time(i);
}
tensor_inputs.reserve(cmd.inputs.size());
// refcnt == 1, owners: [TensorInfo::ptr]
for (auto i : cmd.inputs) {
mgb_assert(i->ptr, "Invalid input tensor ptr!");
// refcnt ++, owners: [i->ptr, tensor_inputs]
tensor_inputs.push_back(i->ptr);
}
RECORD_EVENT(OpExecuteEvent, apply_id);
// Begin profiling operator
SmallVector<std::pair<CompNode, uint64_t>> kernels;
if (profiling_device) {
// Collecting devices
SmallVector<CompNode> devices;
if (state.profiler->is_profiling()) {
for (auto&& i : concat(cmd.inputs, cmd.outputs)) {
if (i != nullptr && count(devices, i->desc.comp_node) == 0) {
devices.push_back(i->desc.comp_node);
kernels.push_back({i->desc.comp_node, Profiler::next_id()});
}
}
}
for (auto* input: cmd.inputs) {
auto input_id = input->id;
RECORD_EVENT(OpInputEvent, input_id);
RECORD_EVENT(TensorUsageEvent, input_id);
RECORD_EVENT(OpInputFinishEvent, input_id);
}
// Fused by command buffer. @see: CommandBuffer::fuse_del
// Now if dest is inplacable, it's refcnt would be decreased to 1 and owned by tensor_inputs after Del.
// Note for exprs like 'y = x op x', inplace is unsupported yet but Del would be also fused.
for (auto* del : cmd.dels) {
// refcnt --, owners: [tensor_inputs]
// if it's decreased to 1, would be detected at @see: proxy_graph_detail::apply_on_physical_tensor
uint64_t del_id = del->id;
RECORD_EVENT(OpDelEvent, del_id);
free(del);
RECORD_EVENT(OpDelFinishEvent, del_id);
}
RECORD_EVENT(OpExecuteEvent, apply_id, cmd.op,
tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs));
for (auto&& device: devices) {
sync_device_scope(device);
RECORD_DEVICE_EVENT(KernelExecuteEvent, device, apply_id, cmd.op,
tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs));
// Before wait
//TODO: split operator wait and execute so that OpWait could be corrected recorded.
// Before execute
for (auto&& [device, kernel_id]: kernels) {
RECORD_EVENT(KernelExecuteEvent, apply_id, kernel_id, Timer::record_event(device));
}
if (state.options.enable_dtr_auto_drop && state.options.dtr_eviction_threshold > 0) {
auto_evict();
......@@ -579,20 +608,26 @@ void ChannelImpl::do_apply_op(const ApplyOp& cmd) {
// Apply op
// Here std::move is REQUIRED for removing duplicated references.
auto tensor_outputs = OpDef::apply_on_physical_tensor(
*cmd.op, tensor_inputs);
*cmd.op, std::move(tensor_inputs));
// After execute
for (auto&& device : devices) {
RECORD_DEVICE_EVENT(KernelExecuteFinishEvent, device, apply_id, cmd.op,
tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs));
for (auto&& [device, kernel_id]: kernels) {
RECORD_EVENT(KernelExecuteFinishEvent, apply_id, kernel_id, Timer::record_event(device));
}
RECORD_EVENT(OpExecuteFinishEvent, apply_id, cmd.op,
tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs));
// End profiling operator
mgb_assert(tensor_outputs.size() == cmd.outputs.size());
for (size_t i = 0; i < tensor_outputs.size(); ++i) {
auto output = cmd.outputs[i];
if (output != nullptr && output->ptr == nullptr) {
if (output == nullptr) {
RECORD_EVENT(OpOutputEvent, 0);
RECORD_EVENT(OpOutputFinishEvent, 0);
} else if (output->ptr != nullptr) {
RECORD_EVENT(OpOutputEvent, output->id);
RECORD_EVENT(OpOutputFinishEvent, output->id);
} else {
RECORD_EVENT(OpOutputEvent, output->id);
produce_tensor(output, tensor_outputs[i]);
RECORD_EVENT(OpOutputFinishEvent, output->id);
sample_on_device(output->desc.comp_node, false);
}
}
......@@ -612,6 +647,8 @@ void ChannelImpl::do_apply_op(const ApplyOp& cmd) {
}
m_dtr.unpin(cmd.inputs);
}
RECORD_EVENT(OpExecuteFinishEvent, apply_id);
// End profiling operator
}
void ChannelImpl::recompute(TensorInfo::ComputePath* path) {
......@@ -637,6 +674,7 @@ void ChannelImpl::auto_evict() {
}
size_t current_memory = m_dtr.comp_node.get_used_memory();
while (current_memory > state.options.dtr_eviction_threshold) {
sample_on_device(m_dtr.comp_node, false);
auto best = m_dtr.find_best_tensor();
if (!best) {
if (!m_dtr.warn_printed) {
......@@ -656,6 +694,7 @@ void ChannelImpl::auto_evict() {
if (best->evict_type == EvictType::DROP) {
m_dtr.update_dsu_after_evict(best);
}
sample_on_device(m_dtr.comp_node, false);
}
}
......@@ -665,6 +704,10 @@ void ChannelImpl::detach_users(TensorInfo* dest) {
SmallVector<TensorInfo*> outputs = user->outputs;
SmallVector<TensorInfo*> inputs = user->inputs;
for (auto* output: outputs) {
// When a `ComputePath` is detach from it's input,
// there is no need to reserve it,
// so we detach all output of this path
// to decrease it's `ref_cnt` to zero.
if (output == nullptr) {
continue;
}
......@@ -674,63 +717,79 @@ void ChannelImpl::detach_users(TensorInfo* dest) {
input->ref_cnt --;
}
}
// now user is dead
}
mgb_assert(dest->users.size() == 0);
//dest->users.clear();
mgb_assert(dest->users.empty(), "ComputePath leaking");
}
bool ChannelImpl::check_available() {
return !m_closed;
}
void ChannelImpl::sync_device_scope(CompNode device) {
auto& state = get_worker_state();
auto& prev = state.device_scope_map[device];
auto& current = state.scopes;
auto push_scope = [&](std::string name) {
RECORD_DEVICE_EVENT(DeviceScopeEvent, device, name);
};
auto pop_scope = [&](std::string name) {
RECORD_DEVICE_EVENT(DeviceScopeFinishEvent, device, name);
};
size_t similarity = 0;
for (size_t i = 0; i < prev.size() && i < current.size(); i++) {
if (prev[i] == current[i]) {
similarity++;
TensorPtr ChannelImpl::wait_tensor(TensorInfo* info, TensorProp prop) {
m_buffer.flush();
std::unique_lock<decltype(m_mutex)> lock(m_mutex);
mgb_assert(!m_waitee, "duplicate waitee");
m_waitee = info;
m_waitee_id = Profiler::next_id();
RECORD_EVENT(TensorWaitPropEvent, info->id, m_waitee_id, prop);
bool require_host = prop == TensorProp::HostValue;
bool value_fetching = false;
m_cv.wait(lock, [&]() {
check_worker_exc_unsafe();
if (require_host) {
if (info->ptr && info->ptr->value_fetched()) {
return true;
}
if (!value_fetching) {
m_buffer.enqueue(GetValue{info});
value_fetching = true;
}
return false;
} else {
break;
return static_cast<bool>(info->ptr);
}
});
RECORD_EVENT(TensorWaitPropFinishEvent, info->id, m_waitee_id, prop, m_waitee == nullptr);
if (m_waitee != nullptr) {
mgb_assert(m_waitee == info, "waitee mismatch");
m_waitee = nullptr;
}
while (prev.size() > similarity) {
pop_scope(prev.back());
prev.pop_back();
return info->ptr;
}
void ChannelImpl::notify_tensor_unsafe(TensorInfo* info) {
if (info == m_waitee) {
m_waitee = nullptr;
RECORD_EVENT(TensorNotifyPropEvent, info->id);
m_cv.notify_all();
}
while (prev.size() < current.size()) {
prev.push_back(current[prev.size()]);
push_scope(prev.back());
}
std::unordered_set<TensorInfo*> ChannelImpl::collect_valid_tensors() {
std::unordered_set<TensorInfo*> valid_tensors;
for (auto* handle: m_valid_handle) {
auto* info = reinterpret_cast<TensorInfo*>(handle);
valid_tensors.insert(info);
//TODO: valid_tensors.insert({info, info->status});
}
return valid_tensors;
}
void ChannelImpl::process_one_task(IdentifiedCommand& icmd) {
using namespace ranges;
using namespace ranges::views;
auto& state = get_worker_state();
RECORD_EVENT(CommandExecuteEvent, icmd);
bool finished = false;
auto do_finish_command = [&]{
if (finished) {
return;
}
RECORD_EVENT(CommandFinishEvent, icmd);
finished = true;
};
auto& options = state.options;
//TODO: remove std::visit for support osx 10.12
auto cmd_visitor = [&](const auto& cmd) {
using T = std::decay_t<decltype(cmd)>;
if constexpr (std::is_same_v<T, Put>) {
RECORD_EVENT(TensorCommandEvent, cmd.dest->id, TensorCommandEvent::Put);
auto value = cmd.no_cache ? std::make_shared<Tensor>(cmd.value) : Tensor::make(cmd.value);
produce_tensor(cmd.dest, std::move(value));
RECORD_EVENT(TensorCommandFinishEvent, cmd.dest->id, TensorCommandFinishEvent::Put);
sample_on_device(cmd.dest->desc.comp_node, false);
} else if constexpr (std::is_same_v<T, ApplyOp>) {
do_apply_op(cmd);
for (size_t i = 0; i < cmd.outputs.size(); ++i) {
......@@ -739,7 +798,7 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) {
continue;
}
if (state.options.enable_dtr_auto_drop) {
cmd.outputs[i]->dsu_ptr = std::make_shared<DsuNode>(output->compute_time);
output->dsu_ptr = std::make_shared<DsuNode>(output->compute_time);
}
}
if (state.options.enable_drop && state.options.record_computing_path) {
......@@ -765,6 +824,7 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) {
bool cross_cn = any_of(concat(cmd.inputs, cmd.outputs), is_cross_cn);
bool inplace = any_of(cartesian_product(cmd.inputs, cmd.outputs), is_inplace);
if (!inplace && !cross_cn && !m_dtr.is_bad_op(get_name(*cmd.op))) {
TensorInfo::ComputePath::make(cmd.id, cmd.op, cmd.inputs, cmd.outputs);
size_t detach_cnt = 0;
......@@ -780,7 +840,12 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) {
}
}
} else if constexpr (std::is_same_v<T, Del>) {
RECORD_EVENT(TensorCommandEvent, cmd.dest->id, TensorCommandEvent::Del);
CompNode device = cmd.dest->desc.comp_node;
uint64_t tensor_id = cmd.dest->id;
free(cmd.dest);
RECORD_EVENT(TensorCommandFinishEvent, tensor_id, TensorCommandFinishEvent::Del);
sample_on_device(device, false);
} else if constexpr (std::is_same_v<T, GetValue>) {
if (!cmd.dest->ptr && cmd.dest->evict_type != EvictType::NONE) {
regenerate(cmd.dest);
......@@ -788,50 +853,62 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) {
mgb_assert(cmd.dest->ptr, "Invalid tensor ptr!");
cmd.dest->ptr->fetch_value();
MGB_LOCK_GUARD(m_mutex);
cmd.dest->value_fetched = true;
if (m_waitee == cmd.dest) {
m_cv.notify_all();
}
notify_tensor_unsafe(cmd.dest);
} else if constexpr (std::is_same_v<T, SwapIn>) {
RECORD_EVENT(TensorCommandEvent, cmd.dest->id, TensorCommandEvent::SwapIn);
produce_tensor(cmd.dest, Tensor::make(cmd.dest->h_value));
RECORD_EVENT(TensorCommandFinishEvent, cmd.dest->id, TensorCommandFinishEvent::SwapIn);
sample_on_device(cmd.dest->desc.comp_node, false);
} else if constexpr (std::is_same_v<T, SwapOut>) {
RECORD_EVENT(TensorCommandEvent, cmd.dest->id, TensorCommandEvent::SwapOut);
cmd.dest->h_value = cmd.dest->ptr->get_value();
if (cmd.dest->evict_type == EvictType::NONE) {
release_tensor(cmd.dest);
cmd.dest->evict_type = EvictType::SWAP;
cmd.dest->status = TensorInfo::Swapped;
release_tensor(cmd.dest);
}
RECORD_EVENT(TensorCommandFinishEvent, cmd.dest->id, TensorCommandFinishEvent::SwapOut);
sample_on_device(cmd.dest->desc.comp_node, false);
} else if constexpr (std::is_same_v<T, Drop>) {
RECORD_EVENT(TensorCommandEvent, cmd.dest->id, TensorCommandEvent::Drop);
do_drop(cmd.dest, true);
RECORD_EVENT(TensorCommandFinishEvent, cmd.dest->id, TensorCommandFinishEvent::Drop);
} else if constexpr (std::is_same_v<T, SetOption>) {
state.options.set_option(cmd.key, cmd.value);
options.set_option(cmd.key, cmd.value);
} else if constexpr (std::is_same_v<T, StartProfile>) {
RECORD_EVENT(StartProfileEvent);
CompNode::sync_all();
state.profiler.reset(cmd.profiler);
for (auto* info: cmd.capture_tensors) {
RECORD_EVENT(TensorDeclareEvent, info->id, info->name);
if (info->status == TensorInfo::Produced) {
// TODO: handle swap/drop
RECORD_EVENT(TensorProduceEvent, info->id, info->desc.layout, info->desc.comp_node, info->ptr->dev_tensor().raw_ptr());
}
}
CompNode::foreach([&](CompNode device){
if (Profiler::get_option("sample_rate", 0)) {
sample_on_device(device, true);
}
});
RECORD_EVENT(StartProfileFinishEvent);
} else if constexpr (std::is_same_v<T, StopProfile>) {
for (auto&& [device, scopes]: state.device_scope_map) {
MGB_MARK_USED_VAR(scopes);
sync_device_scope(device);
}
do_finish_command();
auto profiler = std::make_unique<InterpreterProfiler>();
std::swap(profiler, state.profiler);
auto records = profiler->stop();
auto worker_tid = get_worker_tid();
auto host_map = [worker_tid](std::thread::id tid) {
if (tid == worker_tid) {
return "worker";
} else {
return "unknown";
RECORD_EVENT(StopProfileEvent);
for (auto* info: cmd.escape_tensors) {
bool has_value = info->status == TensorInfo::Produced;
if (has_value) {
RECORD_EVENT(TensorReleaseEvent, info->id);
}
};
RECORD_EVENT(TensorEraseEvent, info->id);
}
CompNode::foreach([&](CompNode device){
if (Profiler::get_option("sample_rate", 0)) {
sample_on_device(device, true);
}
});
RECORD_EVENT(StopProfileFinishEvent);
} else if constexpr (std::is_same_v<T, PushScope>) {
state.scopes.push_back(cmd.scope_name);
do_finish_command();
RECORD_EVENT(ScopeEvent, cmd.scope_name);
} else if constexpr (std::is_same_v<T, PopScope>) {
mgb_assert(state.scopes.back() == cmd.scope_name, "scope name mismatch");
state.scopes.pop_back();
do_finish_command();
RECORD_EVENT(ScopeFinishEvent, cmd.scope_name);
} else {
static_assert(!std::is_same_v<T, T>);
......@@ -839,7 +916,7 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) {
};
std::visit([&](const auto& cmd){
using T = std::decay_t<decltype(cmd)>;
if (!state.options.catch_worker_execption) {
if (!options.catch_worker_execption) {
cmd_visitor(cmd);
return;
}
......@@ -855,10 +932,12 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) {
cmd.dest->invalid = true;
}
m_worker_exc = std::current_exception();
m_cv.notify_all();
RECORD_EVENT(WorkerExceptionEvent);
if (m_waitee) {
notify_tensor_unsafe(m_waitee);
}
}
}, icmd.second);
do_finish_command();
}
void ChannelImpl::check_worker_exc_unsafe() {
......@@ -888,17 +967,17 @@ void ChannelImpl::CommandBuffer::flush() {
void ChannelImpl::CommandBuffer::flush(Handle pos) {
auto& state = m_owner->get_channel_state();
for (auto iter = m_commands.begin(); iter != pos; ++iter) {
// mgb_log_debug("%s Flushed", to_string(*iter).c_str());
IdentifiedCommand icmd{++m_owner->m_last_id, std::move(*iter)};
RECORD_EVENT(CommandEnqueueEvent, icmd);
m_owner->m_worker.add_task(std::move(icmd));
if (Profiler::is_profiling()) {
mgb_log_debug("%s Flushed", to_string(*iter).c_str());
}
m_owner->m_worker.add_task(IdentifiedCommand{Profiler::next_id(), std::move(*iter)});
}
m_commands.erase(m_commands.begin(), pos);
}
auto ChannelImpl::CommandBuffer::flush_pos_for(const Command& cmd) -> Handle {
auto& state = m_owner->get_channel_state();
return std::visit([&, this](const auto& cmd) {
return std::visit([this, &state](const auto& cmd) {
using T = std::decay_t<decltype(cmd)>;
if constexpr (std::is_same_v<T, ApplyOp>) {
auto* op_type = cmd.op->dyn_typeinfo();
......@@ -986,46 +1065,37 @@ auto ChannelImpl::CommandBuffer::find_produce(TensorInfo* dest, Range range)
});
}
void ChannelImpl::start_profile(std::unordered_map<std::string, int> option) {
void ChannelImpl::start_profile() {
mgb_assert(check_available(), "Channel already closed");
auto& state = get_channel_state();
auto profiler_option = InterpreterProfiler::Option::from_dict(option);
auto profiler = std::make_unique<InterpreterProfiler>();
profiler->set_option(profiler_option);
profiler->start(InterpreterProfiler::topic_to_mask(profiler_option.topic));
std::swap(profiler, state.profiler);
m_buffer.enqueue(StartProfile{state.profiler.get()});
auto capture_tensors = collect_valid_tensors();
if (capture_tensors.size() > 0) {
m_buffer.enqueue(StartProfile{std::move(capture_tensors)});
}
}
void ChannelImpl::stop_profile(std::string basename, std::string format) {
void ChannelImpl::stop_profile() {
mgb_assert(check_available(), "Channel already closed");
auto& state = get_channel_state();
m_buffer.flush();
auto profiler = std::make_unique<InterpreterProfiler>();
std::swap(profiler, state.profiler);
profiler.release();
m_buffer.enqueue(StopProfile{basename, format});
auto escape_tensors = collect_valid_tensors();
if (escape_tensors.size() > 0) {
m_buffer.enqueue(StopProfile{std::move(escape_tensors)});
}
}
void ChannelImpl::push_scope(std::string name) {
mgb_assert(check_available(), "Channel already closed");
auto& state = get_channel_state();
state.scopes.push(name);
RECORD_EVENT(ScopeEvent, name);
if (state.profiler->is_profiling()) {
state.scopes.push_back(name);
m_buffer.enqueue(PushScope{name});
}
}
void ChannelImpl::pop_scope(std::string name) {
mgb_assert(check_available(), "Channel already closed");
auto& state = get_channel_state();
state.scopes.pop(name);
RECORD_EVENT(ScopeFinishEvent, name);
if (state.profiler->is_profiling()) {
mgb_assert((!state.scopes.empty()) && state.scopes.back() == name, "scope name mismatch");
state.scopes.pop_back();
m_buffer.enqueue(PopScope{name});
}
}
void ChannelImpl::assert_in_channel() {
......@@ -1036,6 +1106,19 @@ void ChannelImpl::assert_in_worker() {
mgb_assert(get_worker_tid() == std::this_thread::get_id(), "this method can only be called in worker thread");
}
void ChannelImpl::sample_on_device(CompNode device, bool force) {
if (!force) {
thread_local int last_sample_id = 0;
int sample_rate = Profiler::is_profiling() ? Profiler::get_option("sample_rate", 0) : 0;
if (!sample_rate || ((++last_sample_id) % sample_rate != 0)) {
return;
}
}
RECORD_EVENT(SampleDeviceEvent, device);
auto [total, free] = device.get_mem_status_bytes();
RECORD_EVENT(SampleDeviceFinishEvent, device, total, free);
}
void ChannelImpl::DynamicSublinear::pin(const SmallVector<TensorInfo*>& vec) {
for (auto i : vec) {
i->pin();
......
......@@ -24,10 +24,10 @@
#include "megbrain/imperative/profiler.h"
#include "./commands.h"
#include "./events.h"
#include "./tensor_info.h"
#include "./option_manager.h"
#include "./profiler.h"
#include "../profiler/events.h"
namespace mgb::imperative::interpreter::intl {
......@@ -37,7 +37,6 @@ struct InterpreterImpl : Interpreter {
std::unique_ptr<Channel> create_channel() override;
};
struct ChannelImpl : Interpreter::Channel {
ChannelImpl();
~ChannelImpl() override;
......@@ -67,19 +66,27 @@ struct ChannelImpl : Interpreter::Channel {
size_t get_option(std::string name) override;
void set_option(std::string name, size_t value) override;
void start_profile(std::unordered_map<std::string, int> option) override;
void stop_profile(std::string basename, std::string format) override;
void start_profile() override;
void stop_profile() override;
void push_scope(std::string) override;
void pop_scope(std::string) override;
private:
struct WorkQueue;
struct State;
TensorInfo* alloc();
void init(TensorInfo*, LogicalTensorDesc desc);
void free(TensorInfo*);
void real_free(TensorInfo*);
void recursive_free(TensorInfo*);
void do_drop(TensorInfo*, bool);
void detach_users(TensorInfo*);
TensorInfo* put_impl(const HostTensorND& value, bool no_cache);
TensorPtr wait_tensor(TensorInfo* info, profiler::TensorProp prop);
void notify_tensor_unsafe(TensorInfo* info);
void process_one_task(IdentifiedCommand&);
void check_worker_exc_unsafe();
......@@ -105,24 +112,31 @@ private:
bool check_available();
void push_scope(std::string, State&);
void pop_scope(std::string, State&);
void assert_in_channel();
void assert_in_worker();
std::thread::id get_worker_tid();
void sync_device_scope(CompNode device);
template <typename TCommand>
void enqueue_command(TCommand&& cmd) {
m_buffer.enqueue(Command{std::forward<TCommand>(cmd)});
}
void sample_on_device(CompNode device, bool force);
// valid => status != Deleted
std::unordered_set<TensorInfo*> collect_valid_tensors();
std::mutex m_mutex;
std::condition_variable m_cv;
MemPool<TensorInfo> m_pool;
std::unordered_set<Handle> m_valid_handle;
TensorInfo* m_waitee = nullptr;
uint64_t m_waitee_id = 0;
std::exception_ptr m_worker_exc;
std::atomic_uint64_t m_last_id = 0;
std::function<void(std::string, std::string)> m_profile_dump_callback;
bool m_closed = false;
......@@ -191,27 +205,98 @@ private:
//! level 0: both sync.
int m_async_level = 2;
struct State {
OptionManager options;
std::vector<std::string> scopes;
std::unique_ptr<InterpreterProfiler> profiler;
struct Scope {
std::string name;
std::unordered_map<std::string, std::unique_ptr<Scope>> children;
size_t version = 0;
size_t parent_version = 0;
size_t tensor_count = 0;
Scope* active_child = nullptr;
Scope* parent = nullptr;
Scope* enter(std::string name) {
auto& child = children[name];
if (!child) {
child = std::make_unique<Scope>();
child->name = name;
child->parent = this;
}
if (version != child->parent_version) {
child->version = 0;
child->parent_version = version;
} else {
child->version++;
}
child->tensor_count = 0;
return active_child = child.get();
}
State() {
profiler = std::make_unique<InterpreterProfiler>();
Scope* exit(std::string name) {
mgb_assert(this->name == name, "scope name mismatch");
parent->active_child = nullptr;
return parent;
}
};
struct ChannelState: State {};
class ScopeManager {
private:
Scope m_root;
Scope* m_current_scope = &m_root;
public:
class ScopeGuard{
private:
ScopeManager* m_manager;
std::string m_name;
public:
ScopeGuard(ScopeManager* manager, std::string name): m_manager{manager}, m_name{name} {
m_manager->push(m_name);
}
~ScopeGuard() {
m_manager->pop(m_name);
}
};
void push(std::string name) {
m_current_scope = m_current_scope->enter(name);
}
void pop(std::string name) {
m_current_scope = m_current_scope->exit(name);
}
std::string next_tensor_name() {
std::string builder;
Scope* scope = &m_root;
while (true) {
builder.append(scope->name);
if (scope->version != 0) {
builder.append(ssprintf("(%ld)", scope->version));
}
if (scope != &m_root) {
builder.append(".");
}
if (scope->active_child == nullptr) {
builder.append(ssprintf(":%%%ld", scope->tensor_count++));
break;
} else {
scope = scope->active_child;
}
}
return builder;
}
};
struct WorkerState: State {
struct State {
std::thread::id tid;
CompNode::UnorderedMap<std::vector<std::string>> device_scope_map;
OptionManager options;
};
struct ChannelState: State {
ScopeManager scopes;
};
struct WorkerState: State {};
ChannelState m_channel_state;
WorkerState m_worker_state;
/*!
* \brief A framework of dynamic sublienar memory optimization
*
......@@ -327,7 +412,6 @@ private:
// assert thread id when call get_xxx_state to avoid misuse
ChannelState& get_channel_state();
WorkerState& get_worker_state();
};
} // namespace mgb::imperative::interpreter::intl
/**
* \file imperative/src/impl/interpreter/profiler.h
* MegEngine is Licensed under the Apache License, Version 2.0 (the "License")
*
* Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
#pragma once
#include "megbrain/imperative/profiler.h"
#include "./commands.h"
#include "./events.h"
#include "./option_manager.h"
namespace mgb::imperative::interpreter::intl {
class InterpreterProfiler: public Profiler<
CommandEnqueueEvent, CommandExecuteEvent, CommandFinishEvent,
OpExecuteEvent, OpExecuteFinishEvent,
KernelExecuteEvent, KernelExecuteFinishEvent,
TensorDeclareEvent, TensorProduceEvent, TensorEraseEvent,
TensorGetPropEvent, TensorWaitPropEvent, TensorNotifyPropEvent, TensorWaitPropFinishEvent,
SyncEvent, SyncFinishEvent,
ScopeEvent, ScopeFinishEvent,
DeviceScopeEvent, DeviceScopeFinishEvent> {
public:
enum Topic {
Command = 0b000001,
Operator = 0b000010,
TensorLifetime = 0b000100,
TensorProp = 0b001000,
Sync = 0b010000,
Scope = 0b100000,
};
struct Option {
Topic topic;
bool align_time;
bool show_operator_name;
static Option from_dict(std::unordered_map<std::string, int> dict) {
Option option;
option.topic = Topic(dict.at("topic"));
option.align_time = bool(dict.at("align_time"));
option.show_operator_name = bool(dict.at("show_operator_name"));
return option;
}
};
Option get_option() const {
return m_option;
}
void set_option(const Option& option) {
m_option = option;
}
static Mask topic_to_mask(Topic topic) {
Mask result;
if (topic & Command) {
result |= mask_of<CommandEnqueueEvent, CommandExecuteEvent, CommandFinishEvent>();
}
if (topic & Operator) {
result |= mask_of<OpExecuteEvent, OpExecuteFinishEvent>();
result |= mask_of<KernelExecuteEvent, KernelExecuteFinishEvent>();
}
if (topic & TensorLifetime) {
result |= mask_of<TensorDeclareEvent, TensorProduceEvent, TensorEraseEvent>();
}
if (topic & TensorProp) {
result |= mask_of<TensorGetPropEvent, TensorWaitPropEvent, TensorNotifyPropEvent, TensorWaitPropFinishEvent>();
}
if (topic & Sync) {
result |= mask_of<SyncEvent, SyncFinishEvent>();
}
if (topic & Scope) {
result |= mask_of<ScopeEvent, ScopeFinishEvent>();
result |= mask_of<DeviceScopeEvent, DeviceScopeFinishEvent>();
}
return result;
}
private:
Option m_option;
};
}
......@@ -47,11 +47,15 @@ struct TensorInfo;
using TensorInfoPtr = std::shared_ptr<TensorInfo>;
struct TensorInfo {
enum Prop {
Device, Shape, DType, DevValue, HostValue
enum Status {
InvalidStatus, Allocated, Produced, Swapped, Dropped, Deleted,
};
uint64_t id;
uint64_t id = -1;
std::string name;
// Most attrs of TensorInfo, except `ptr` and `h_value`,
// were visited read and written in main thread.
// Lock interpreter when visiting `ptr`.
TensorPtr ptr;
LogicalTensorDesc desc;
......@@ -59,13 +63,17 @@ struct TensorInfo {
size_t memory;
double last_used_time;
// FIXME: broken by drop
bool value_fetched = false;
bool invalid = false;
bool allow_delete = false;
EvictType evict_type = NONE;
// Status should be only modified in worker thread
Status status = InvalidStatus;
// Used by HostCompute and Memory Swap.
// HostCompute and Swap does not happen in one thread.
// Maybe a barrier is needed.
HostTensorND h_value;
// reserved for auto drop
......@@ -74,6 +82,10 @@ struct TensorInfo {
size_t ref_cnt = 0;
std::shared_ptr<DsuNode> dsu_ptr;
// Not reference count, inc when used as input
size_t ptr_use_count = 0;
// Used by `Drop` action
struct ComputePath {
uint64_t id;
std::shared_ptr<OpDef> op;
......@@ -126,20 +138,24 @@ struct TensorInfo {
--pinned;
}
void detach_producer() {
// returns true if producer is deleted
bool detach_producer() {
if (!producer) {
return;
return false;
}
auto output = std::find(producer->outputs.begin(), producer->outputs.end(), this);
mgb_assert(output != producer->outputs.end());
*output = nullptr;
bool deleted = false;
if (producer->ref_cnt() == 0) {
for (auto* input: producer->unique_inputs) {
input->users.erase(std::find(input->users.begin(), input->users.end(), producer));
}
delete producer;
deleted = true;
}
producer = nullptr;
return deleted;
}
bool size_exceeds_thd(size_t thd) {
......@@ -150,26 +166,4 @@ struct TensorInfo {
};
}
template <>
struct ToStringTrait<interpreter::intl::TensorInfo::Prop>{
using TensorInfo = interpreter::intl::TensorInfo;
std::string operator()(TensorInfo::Prop prop) const {
switch(prop) {
case TensorInfo::DType:
return "dtype";
case TensorInfo::DevValue:
return "dev_value";
case TensorInfo::Device:
return "device";
case TensorInfo::HostValue:
return "host_value";
case TensorInfo::Shape:
return "shape";
default:
return "unknown";
}
}
};
}
......@@ -22,47 +22,58 @@
#include "./event_pool.h"
#include "./op_trait.h"
#include "./profiler/formats.h"
namespace mgb {
namespace imperative {
namespace {
DeviceTimer::SharedEvent alloc_recorded_event(CompNode device) {
auto event = EventPool::with_timer().alloc_shared(device);
event->record();
return event;
uint64_t Timer::get_nsecs() {
using namespace std::chrono;
auto finish = steady_clock::now();
auto duration = duration_cast<nanoseconds>(finish - m_start);
return duration.count();
}
} // namespace
DeviceTimer::SharedEvent DeviceTimer::get_device_time(CompNode device) {
return alloc_recorded_event(device);
uint64_t Timer::get_started_at() {
return m_started_at;
}
SmallVector<DeviceTimer::SharedEvent> DeviceTimer::get_all(SmallVector<CompNode> device_list) {
SmallVector<DeviceTimer::SharedEvent> results;
for (auto&& device: device_list) {
results.push_back(alloc_recorded_event(device));
}
return results;
void Timer::reset() {
using namespace std::chrono;
m_start = steady_clock::now();
auto now_ns = duration_cast<nanoseconds>(std::chrono::system_clock::now().time_since_epoch());
m_started_at = now_ns.count();
}
double HostTimer::get_msecs() {
using namespace std::chrono;
auto finish = steady_clock::now();
auto duration = duration_cast<microseconds>(finish - m_start);
return (double)duration.count() / 1e3;
std::shared_ptr<CompNode::Event> Timer::record_event(CompNode device) {
auto event = EventPool::with_timer().alloc_shared(device);
event->record();
return event;
}
double HostTimer::get_started_at() {
return m_started_at;
Profiler::options_t Profiler::sm_profile_options;
std::mutex Profiler::sm_mutex;
std::unordered_map<std::thread::id, Profiler*> Profiler::sm_profilers;
Timer Profiler::sm_timer;
std::atomic_uint64_t Profiler::sm_last_id = 0;
bool Profiler::sm_profiling = false;
thread_local std::unique_ptr<Profiler> Profiler::tm_profiler = std::make_unique<Profiler>();
std::atomic_size_t Profiler::sm_preferred_capacity;
auto Profiler::get_thread_dict() -> thread_dict_t {
MGB_LOCK_GUARD(sm_mutex);
thread_dict_t thread_dict;
for (auto&& [tid, profiler]: sm_profilers) {
thread_dict[tid] = profiler->m_thread_name;
}
return thread_dict;
}
void HostTimer::reset() {
using namespace std::chrono;
m_start = steady_clock::now();
auto now_us = duration_cast<microseconds>(std::chrono::system_clock::now().time_since_epoch());
m_started_at = (double)(now_us.count()) / 1e3;
void Profiler::dump_profile(std::string basename, std::string format, results_t results, options_t options) {
auto thread_dict = get_thread_dict();
{
mgb_log_error("unsupported profiling format %s", format.c_str());
}
}
} // namespace imperative
......
#include <string>
#include <memory>
#include "megbrain/utils/json.h"
namespace mgb {
namespace imperative {
class ChromeTraceEvent {
public:
ChromeTraceEvent& name(std::string name) {
m_name = std::move(name);
return *this;
}
ChromeTraceEvent& tid(uint64_t tid) {
m_tid = std::move(tid);
return *this;
}
ChromeTraceEvent& cat(std::string cat) {
m_cat = std::move(cat);
return *this;
}
ChromeTraceEvent& pid(uint64_t pid) {
m_pid = pid;
return *this;
}
ChromeTraceEvent& id(uint64_t id) {
m_id = id;
return *this;
}
ChromeTraceEvent& idx(uint64_t idx) {
m_idx = idx;
return *this;
}
ChromeTraceEvent& ts(double ts) {
m_ts = ts;
return *this;
}
ChromeTraceEvent& dur(double dur) {
m_dur = dur;
return *this;
}
ChromeTraceEvent& ph(char ph) {
m_ph = ph;
return *this;
}
ChromeTraceEvent& bp(char bp) {
m_bp = bp;
return *this;
}
ChromeTraceEvent& args(std::shared_ptr<json::Object> args) {
m_args = std::move(args);
return *this;
}
ChromeTraceEvent& arg(std::string key, std::string value) {
if (!m_args) {
m_args = json::Object::make();
}
(*m_args)[key] = json::String::make(value);
return *this;
}
ChromeTraceEvent& arg(std::string key, double value) {
if (!m_args) {
m_args = json::Object::make();
}
(*m_args)[key] = json::Number::make(value);
return *this;
}
ChromeTraceEvent& arg(std::string key, std::shared_ptr<json::Value> value) {
if (!m_args) {
m_args = json::Object::make();
}
(*m_args)[key] = value;
return *this;
}
std::shared_ptr<json::Object> to_json() const {
auto result = json::Object::make();
auto prop_str = [&](auto key, auto value) {
if (value.empty()) {
return;
}
(*result)[key] = json::String::make(value);
};
auto prop_num = [&](auto key, auto value) {
if (!value) {
return;
}
(*result)[key] = json::Number::make(value.value());
};
auto prop_char = [&](auto key, auto value) {
if (!value) {
return;
}
(*result)[key] = json::String::make(std::string{} + value.value());
};
prop_str("name", m_name);
prop_num("tid", m_tid);
prop_str("cat", m_cat);
prop_num("pid", m_pid);
prop_num("id", m_id);
prop_num("idx", m_idx);
prop_num("ts", m_ts);
prop_num("dur", m_dur);
prop_char("ph", m_ph);
prop_char("bp", m_bp);
if (m_args) {
(*result)["args"] = m_args;
}
return result;
}
private:
std::string m_name;
std::string m_cat;
std::optional<uint64_t> m_tid;
std::optional<uint64_t> m_pid;
std::optional<uint64_t> m_id;
std::optional<uint64_t> m_idx;
std::optional<double> m_ts;
std::optional<double> m_dur;
std::optional<char> m_ph;
std::optional<char> m_bp;
std::shared_ptr<json::Object> m_args;
};
class ChromeTraceEventList {
public:
ChromeTraceEvent& new_event() {
m_content.emplace_back();
return m_content.back();
}
std::shared_ptr<json::Array> to_json() const {
auto result = json::Array::make();
for (auto&& event: m_content) {
result->add(event.to_json());
}
return result;
}
private:
std::vector<ChromeTraceEvent> m_content;
};
} // namespace imperative
} // namespace mgb
......@@ -11,65 +11,176 @@
#pragma once
#include "./commands.h"
#include "./tensor_info.h"
#include "megbrain/utils/small_vector.h"
namespace mgb::imperative::interpreter::intl {
#include "../op_trait.h"
namespace mgb::imperative::profiler {
enum class TensorProp {
InvalidProp, Device, Shape, DType, DevValue, HostValue,
};
using OpParams = std::unordered_map<std::string, std::string>;
}
namespace mgb::imperative {
template <>
struct ToStringTrait<profiler::TensorProp>{
using TensorProp = profiler::TensorProp;
std::string operator()(TensorProp prop) const {
switch(prop) {
case TensorProp::DType:
return "dtype";
case TensorProp::DevValue:
return "dev_value";
case TensorProp::Device:
return "device";
case TensorProp::HostValue:
return "host_value";
case TensorProp::Shape:
return "shape";
default:
return "unknown";
}
}
};
}
namespace mgb::imperative::profiler {
#define DEF_EVENT(X, ...) struct X##Event __VA_ARGS__;
#define DEF_DUR_EVENT(X, ...) struct X##Event __VA_ARGS__; struct X##FinishEvent __VA_ARGS__;
DEF_EVENT(Command, {
IdentifiedCommand icmd;
DEF_EVENT(OpDispatch, {
uint64_t op_id;
std::string op_name;
std::function<OpParams()> op_params;
SmallVector<uint64_t> inputs;
SmallVector<uint64_t> outputs;
});
DEF_DUR_EVENT(OpInput, {
uint64_t tensor_id;
TensorShape shape;
});
DEF_DUR_EVENT(OpDel, {
uint64_t tensor_id;
TensorShape shape;
});
DEF_DUR_EVENT(OpOutput, {
uint64_t tensor_id;
TensorShape shape;
});
DEF_EVENT(CommandEnqueue, :CommandEvent {});
DEF_EVENT(CommandExecute, :CommandEvent {});
DEF_EVENT(CommandFinish, :CommandEvent {});
DEF_DUR_EVENT(OpExecute, {
uint64_t id;
std::shared_ptr<OpDef> op;
SmallVector<uint64_t> inputs;
SmallVector<uint64_t> outputs;
uint64_t op_id;
});
DEF_DUR_EVENT(OpPostExecute, {
uint64_t op_id;
});
DEF_DUR_EVENT(KernelExecute, {
uint64_t id;
std::shared_ptr<OpDef> op;
SmallVector<uint64_t> inputs;
SmallVector<uint64_t> outputs;
uint64_t op_id;
uint64_t kernel_id;
std::shared_ptr<CompNode::Event> event;
});
DEF_EVENT(TensorDeclare, {
uint64_t tensor_id;
std::string name;
});
DEF_EVENT(TensorProduce, {
uint64_t tensor_id;
TensorLayout layout;
CompNode device;
void* ptr;
});
DEF_EVENT(TensorUsage, {
uint64_t tensor_id;
});
DEF_EVENT(TensorRelease, {
uint64_t tensor_id;
});
DEF_EVENT(TensorErase, {
uint64_t tensor_id;
size_t use_count;
});
DEF_EVENT(TensorGetProp, {
uint64_t tensor_id;
TensorInfo::Prop prop;
std::string prop_desc;
TensorProp prop;
});
DEF_EVENT(TensorNotifyProp, {
uint64_t tensor_id;
uint64_t wait_id;
TensorProp prop;
});
DEF_DUR_EVENT(TensorWaitProp, {
DEF_EVENT(TensorWaitProp, {
uint64_t tensor_id;
TensorInfo::Prop prop;
std::string prop_desc;
uint64_t wait_id;
TensorProp prop;
});
DEF_EVENT(TensorNotifyProp, {
DEF_EVENT(TensorWaitPropFinish, {
uint64_t tensor_id;
TensorInfo::Prop prop;
std::string prop_desc;
uint64_t wait_id;
TensorProp prop;
bool notified;
});
DEF_DUR_EVENT(Sync, {});
DEF_DUR_EVENT(SampleDevice, {
CompNode device;
size_t total_memory;
size_t free_memory;
});
DEF_EVENT(WorkerException, {});
DEF_EVENT(ShapeInfer, {
bool success;
});
DEF_DUR_EVENT(Scope, {
std::string name;
});
DEF_DUR_EVENT(DeviceScope, {
std::string name;
std::shared_ptr<CompNode::Event> event;
});
DEF_DUR_EVENT(Sync, {});
DEF_DUR_EVENT(StartProfile, {
size_t capture_count;
});
DEF_DUR_EVENT(StopProfile, {
size_t escape_count;
});
DEF_DUR_EVENT(TensorCommand, {
enum Kind {
Put, Del, SwapIn, SwapOut, Drop, ReGen, RecFree, GetValue
};
uint64_t tensor_id;
Kind kind;
});
#undef DEF_EVENT
#undef DEF_DUR_EVENT
}
/**
* \file imperative/src/impl/interpreter/profiler.cpp
* \file imperative/src/impl/interpreter/profiler.h
* MegEngine is Licensed under the Apache License, Version 2.0 (the "License")
*
* Copyright (c) 2014-2020 Megvii Inc. All rights reserved.
......@@ -9,22 +9,12 @@
* "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
#include "./profiler.h"
#pragma once
#include <sstream>
#include <cinttypes>
#include <unordered_set>
#if defined(__unix__) || (defined(__APPLE__) && defined(__MACH__))
#include <unistd.h>
#elif defined(_WIN32)
#include <process.h>
#else
#error Unsupported platform
#endif
#include "../op_trait.h"
namespace mgb::imperative::interpreter::intl {
#include "megbrain/imperative/profiler.h"
namespace mgb::imperative::profiler {
}
......@@ -6,6 +6,8 @@
#include "megbrain/tensor.h"
#include "./events.h"
namespace mgb::imperative::profiler {
struct ProfileDeviceState {
......@@ -53,6 +55,7 @@ struct ProfileStaticsState {
struct ProfileOperatorState {
uint64_t id;
std::string name;
OpParams params;
SmallVector<uint64_t> inputs;
SmallVector<uint64_t> outputs;
CompNode device;
......
......@@ -47,8 +47,8 @@ struct Interpreter {
virtual size_t get_option(std::string name) = 0;
virtual void set_option(std::string name, size_t value) = 0;
virtual void start_profile(std::unordered_map<std::string, int> option) = 0;
virtual void stop_profile(std::string basename, std::string format) = 0;
virtual void start_profile() = 0;
virtual void stop_profile() = 0;
virtual void push_scope(std::string name) = 0;
virtual void pop_scope(std::string name) = 0;
......
......@@ -17,6 +17,9 @@
#include <fstream>
#include <chrono>
#include <bitset>
#include <deque>
#include <any>
#include <typeindex>
#include "megbrain/comp_node.h"
#include "megbrain/graph/event.h"
......@@ -29,165 +32,188 @@
namespace mgb {
namespace imperative {
class DeviceTimer {
public:
using SharedEvent = std::shared_ptr<CompNode::Event>;
DeviceTimer() = default;
SharedEvent get_device_time(CompNode device);
SmallVector<SharedEvent> get_all(SmallVector<CompNode> device_list);
};
class HostTimer {
class Timer {
public:
void reset();
double get_msecs();
double get_started_at();
uint64_t get_nsecs();
uint64_t get_started_at();
static std::shared_ptr<CompNode::Event> record_event(CompNode device);
private:
decltype(std::chrono::steady_clock::now()) m_start;
double m_started_at;
uint64_t m_started_at;
};
class ProfilerBase {
class Profiler {
public:
using Host = std::thread::id;
using Device = CompNode;
struct HostInstant {
Host tid;
double time;
void wait() const {}
struct Record {
uint64_t id;
uint64_t time; //in ns
std::any data;
};
struct DeviceInstant {
double before;
std::shared_ptr<CompNode::Event> event;
double after;
void wait() const {
event->host_wait();
}
enum Status: uint8_t {
Running = 0,
Recording = 1,
Collecting = 2,
};
using ProfileCollector = std::function<void(std::thread::id, Record)>;
using option_t = uint64_t;
using options_t = std::unordered_map<std::string, option_t>;
using result_t = std::pair<std::thread::id, Record>;
using results_t = std::vector<result_t>;
using thread_dict_t = std::unordered_map<std::thread::id, std::string>;
private:
std::thread::id m_thread_id;
std::vector<Record> m_records;
std::atomic<Status> m_status = Running;
uint64_t m_last_time = 0;
std::string m_thread_name;
static options_t sm_profile_options;
static std::mutex sm_mutex;
static std::unordered_map<std::thread::id, Profiler*> sm_profilers;
static Timer sm_timer;
static std::atomic_uint64_t sm_last_id;
static std::atomic_size_t sm_preferred_capacity;
static bool sm_profiling;
static constexpr bool sm_debug = false;
thread_local static std::unique_ptr<Profiler> tm_profiler;
public:
Profiler() {
m_thread_id = std::this_thread::get_id();
MGB_LOCK_GUARD(sm_mutex);
if (sm_profilers.size() == 0) {
reset();
}
mgb_assert(sm_profilers.count(m_thread_id) == 0);
sm_profilers[m_thread_id] = this;
}
~Profiler() {
MGB_LOCK_GUARD(sm_mutex);
mgb_assert(sm_profilers.count(m_thread_id) == 1);
sm_profilers.erase(m_thread_id);
}
public:
static Profiler& get_instance() {
return *tm_profiler;
}
using Instant = std::variant<HostInstant, DeviceInstant>;
static void reset() {
mgb_assert(sm_profilers.size() == 0, "profiler already running");
sm_timer.reset();
}
template <typename TEvent>
struct EventRecord {
Instant instant;
TEvent data;
static uint64_t next_id() {
return sm_last_id++;
}
const HostInstant& host() const {
return std::get<HostInstant>(instant);
template <typename T, typename... TArgs>
static uint64_t record(TArgs&&... args) {
auto& profiler = get_instance();
auto last_time = profiler.m_last_time;
if constexpr (sm_debug) {
Status expected = Running;
mgb_assert(profiler.m_status.compare_exchange_strong(expected, Recording));
}
uint64_t id = next_id();
uint64_t time = sm_timer.get_nsecs();
time = std::max(time, last_time + 2000);
profiler.m_last_time = time;
profiler.m_records.push_back({id, time, T{std::forward<TArgs>(args)...}});
if constexpr (sm_debug) {
Status expected = Recording;
mgb_assert(profiler.m_status.compare_exchange_strong(expected, Running));
}
return id;
}
const DeviceInstant& device() const {
return std::get<DeviceInstant>(instant);
static results_t collect() {
MGB_LOCK_GUARD(sm_mutex);
if constexpr (sm_debug) {
for (auto&& [tid, profiler]: sm_profilers) {
Status expected = Running;
mgb_assert(profiler->m_status.compare_exchange_strong(expected, Collecting));
}
}
std::vector<std::pair<std::thread::id, Record>> profile_data;
for (auto&& [tid, profiler]: sm_profilers) {
sm_preferred_capacity = std::max(sm_preferred_capacity.load(), profiler->m_records.size());
for (auto& record: profiler->m_records) {
profile_data.push_back({tid, std::move(record)});
}
profiler->m_records.clear();
profiler->m_records.reserve(sm_preferred_capacity);
}
std::sort(profile_data.begin(), profile_data.end(), [](auto& lhs, auto& rhs){
return lhs.second.id < rhs.second.id;
});
if constexpr (sm_debug) {
for (auto&& [tid, profiler]: sm_profilers) {
Status expected = Collecting;
mgb_assert(profiler->m_status.compare_exchange_strong(expected, Running));
}
}
return profile_data;
}
void wait() const {
std::visit([&](const auto& instant){ instant.wait(); }, instant);
static option_t get_option(std::string key, option_t default_val) {
if (!sm_profile_options.count(key)) {
return default_val;
}
return sm_profile_options.at(key);
}
};
protected:
HostInstant record_host() {
return {std::this_thread::get_id(), m_host_timer.get_msecs()};
}
DeviceInstant record_device(Device device) {
auto before = m_host_timer.get_msecs();
auto event = m_device_timer.get_device_time(device);
auto after = m_host_timer.get_msecs();
return {before, event, after};
}
protected:
std::atomic_int64_t m_last_id = 0;
HostTimer m_host_timer;
DeviceTimer m_device_timer;
Spinlock m_lock;
};
static void load_options(options_t options) {
sm_profile_options = std::move(options);
}
template <typename... TEvents>
class Profiler: public ProfilerBase {
public:
using Record = std::variant<EventRecord<TEvents>...>;
using Mask = std::bitset<sizeof...(TEvents)>;
static options_t get_options() {
return sm_profile_options;
}
struct Data {
std::vector<Record> records;
double started_at;
};
static bool is_profiling() {
return sm_profiling;
}
template <typename TEvent, size_t index = 0>
static constexpr size_t index_of() {
if constexpr (index == std::variant_size_v<Record>) {
return index;
} else if constexpr (std::is_same_v<EventRecord<TEvent>, std::variant_alternative_t<index, Record>>) {
return index;
} else {
return index_of<TEvent, index+1>();
static void start_profile() {
mgb_assert(!sm_profiling);
sm_profiling = true;
}
};
template <typename... TEvents2>
static Mask mask_of() {
return Mask{} | (Mask{}.set(index_of<TEvents2>()) |...);
static void stop_profile() {
mgb_assert(sm_profiling);
sm_profiling = false;
}
enum Status {
NotStarted, Profiling, Stopped
};
static thread_dict_t get_thread_dict();
static void dump_profile(std::string basename, std::string format, results_t results, options_t options);
};
class ProfileDataCollector {
public:
template <typename TEvent, typename... TArgs>
void record_host(TArgs&&... args) {
MGB_LOCK_GUARD(m_lock);
if (!m_event_mask.test(index_of<TEvent>())) {
return;
}
mgb_assert(m_status != Stopped, "record after stop");
auto instant = HostInstant{std::this_thread::get_id(), m_host_timer.get_msecs()};
m_record_list.emplace_back(EventRecord<TEvent>{std::move(instant), {std::forward<TArgs>(args)...}});
template <typename T>
using SubCollector = std::function<void(uint64_t, std::thread::id, uint64_t, T)>;
private:
std::unordered_map<std::type_index, SubCollector<std::any>> m_collectors;
public:
template <typename T>
ProfileDataCollector& handle(SubCollector<T> collector) {
auto erased = [collector](uint64_t id, std::thread::id tid, uint64_t time, std::any data){
collector(id, tid, time, std::any_cast<T>(std::move(data)));
};
m_collectors[typeid(T)] = erased;
return *this;
}
template <typename TEvent, typename... TArgs>
void record_device(Device device, TArgs&&... args) {
MGB_LOCK_GUARD(m_lock);
if (!m_event_mask.test(index_of<TEvent>())) {
void operator()(uint64_t id, std::thread::id tid, uint64_t time, std::any event) {
std::type_index type = event.type();
if (m_collectors.count(type) == 0) {
return;
}
mgb_assert(m_status != Stopped, "record after stop");
auto before = m_host_timer.get_msecs();
auto event = m_device_timer.get_device_time(device);
auto after = m_host_timer.get_msecs();
auto instant = DeviceInstant{before, event, after};
m_record_list.emplace_back(EventRecord<TEvent>{std::move(instant), {std::forward<TArgs>(args)...}});
}
// unsafe
bool is_profiling() {
return m_status == Profiling;
}
void start(Mask mask) {
MGB_LOCK_GUARD(m_lock);
mgb_assert(m_status == NotStarted, "profiler already started");
m_status = Profiling;
m_event_mask = mask;
m_host_timer.reset();
}
Data stop() {
MGB_LOCK_GUARD(m_lock);
mgb_assert(m_status == Profiling, "profiler not active");
m_status = Stopped;
for (auto&& record: m_record_list) {
std::visit([&](const auto& record){
record.wait();
}, record);
}
auto records = std::move(m_record_list);
return { records, m_host_timer.get_started_at() };
}
protected:
std::vector<Record> m_record_list;
Mask m_event_mask;
std::atomic<Status> m_status = NotStarted;
auto& handler = m_collectors.at(type);
handler(id, tid, time, std::move(event));
}
};
} // namespace imperative
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册