diff --git a/imperative/python/megengine/utils/profiler.py b/imperative/python/megengine/utils/profiler.py index ae70da26fd18abf31bc676a6d22af040a2bc07b8..cf8fd518fa6adb1113104fd4feb78f54ce29b0fe 100644 --- a/imperative/python/megengine/utils/profiler.py +++ b/imperative/python/megengine/utils/profiler.py @@ -7,9 +7,14 @@ # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. import json -from contextlib import contextmanager +import os +import re +from contextlib import ContextDecorator, contextmanager +from functools import wraps from typing import List +from weakref import WeakSet +from .. import _atexit from ..core._imperative_rt.core2 import ( pop_scope, push_scope, @@ -17,9 +22,13 @@ from ..core._imperative_rt.core2 import ( stop_profile, sync, ) +from ..logger import get_logger +_running_profiler = None +_living_profilers = WeakSet() -class Profiler: + +class Profiler(ContextDecorator): r""" Profile graph execution in imperative mode. @@ -35,9 +44,10 @@ class Profiler: from megengine.utils.profiler import Profiler # With Learnable Parameters + profiler = Profiler() for iter in range(0, 10): # Only profile record of last iter would be saved - with Profiler("profile"): + with profiler: # your code here # Then open the profile file in chrome timeline window @@ -45,46 +55,105 @@ class Profiler: CHROME_TIMELINE = "chrome_timeline.json" - COMMAND = 1 << 0 - OPERATOR = 1 << 1 - TENSOR_LIFETIME = 1 << 2 - TENSOR_PROP = 1 << 3 - SYNC = 1 << 4 - SCOPE = 1 << 5 - ALL = (1 << 6) - 1 + valid_options = {"sample_rate": 0, "profile_device": 1, "num_tensor_watch": 10} + valid_formats = {"chrome_timeline.json", "memory_flow.svg"} def __init__( self, path: str = "profile", - format: str = CHROME_TIMELINE, - *, - topic=OPERATOR | SCOPE, - align_time=True, - show_operator_name=True + format: str = "chrome_timeline.json", + formats: List[str] = None, + **kwargs ) -> None: - self._path = path - self._format = format - self._options = { - "topic": int(topic), - "align_time": int(align_time), - "show_operator_name": int(show_operator_name), - } + if not formats: + formats = [format] - def __enter__(self): + assert not isinstance(formats, str), "formats excepts list, got str" + + for format in formats: + assert format in Profiler.valid_formats, "unsupported format {}".format( + format + ) + + self._path = path + self._formats = formats + self._options = {} + for opt, optval in Profiler.valid_options.items(): + self._options[opt] = int(kwargs.pop(opt, optval)) + self._pid = "" + + @property + def path(self): + if len(self._formats) == 0: + format = "" + elif len(self._formats) == 1: + format = self._formats[0] + else: + format = "{" + ",".join(self._formats) + "}" + return self.format_path(self._path, self._pid, format) + + @property + def directory(self): + return self._path + + @property + def formats(self): + return list(self._formats) + + def start(self): + global _running_profiler + + assert _running_profiler is None + _running_profiler = self + self._pid = os.getpid() start_profile(self._options) return self - def __exit__(self, val, tp, trace): - stop_profile(self._path, self._format) - # dump is async, so it's necessary to sync interpreter + def stop(self): + global _running_profiler + + assert _running_profiler is self + _running_profiler = None sync() + self._dump_callback = stop_profile() + self._pid = os.getpid() + _living_profilers.add(self) + + def dump(self): + if self._dump_callback is not None: + if not os.path.exists(self._path): + os.makedirs(self._path) + if not os.path.isdir(self._path): + get_logger().warning( + "{} is not a directory, cannot write profiling results".format( + self._path + ) + ) + return + for format in self._formats: + path = self.format_path(self._path, self._pid, format) + get_logger().info("process {} generating {}".format(self._pid, format)) + self._dump_callback(path, format) + get_logger().info("profiling results written to {}".format(path)) + self._dump_callback = None + _living_profilers.remove(self) + + def format_path(self, path, pid, format): + return os.path.join(path, "{}.{}".format(pid, format)) + + def __enter__(self): + self.start() + + def __exit__(self, val, tp, trace): + self.stop() def __call__(self, func): - def wrapper(*args, **kwargs): - with self: - return func(*args, **kwargs) + func = super().__call__(func) + func.__profiler__ = self + return func - return wrapper + def __del__(self): + self.dump() @contextmanager @@ -94,16 +163,77 @@ def scope(name): pop_scope(name) -profile = Profiler +def profile(*args, **kwargs): + if len(args) == 1 and len(kwargs) == 0 and callable(args[0]): + return Profiler()(args[0]) + return Profiler(*args, **kwargs) + + +def merge_trace_events(directory: str): + names = filter( + lambda x: re.match(r"\d+\.chrome_timeline\.json", x), os.listdir(directory) + ) + + def load_trace_events(name): + with open(os.path.join(directory, name), "r", encoding="utf-8") as f: + return json.load(f) + + def find_metadata(content): + if isinstance(content, dict): + assert "traceEvents" in content + content = content["traceEvents"] + if len(content) == 0: + return None + assert content[0]["name"] == "Metadata" + return content[0]["args"] + + contents = list(map(load_trace_events, names)) + + metadata_list = list(map(find_metadata, contents)) + + min_local_time = min( + map(lambda x: x["localTime"], filter(lambda x: x is not None, metadata_list)) + ) + + events = [] + + for content, metadata in zip(contents, metadata_list): + local_events = content["traceEvents"] + if len(local_events) == 0: + continue + + local_time = metadata["localTime"] + time_shift = local_time - min_local_time + + for event in local_events: + if "ts" in event: + event["ts"] = int(event["ts"] + time_shift) + + events.extend(filter(lambda x: x["name"] != "Metadata", local_events)) + + result = { + "traceEvents": events, + } + + path = os.path.join(directory, "merge.chrome_timeline.json") + + with open(path, "w") as f: + json.dump(result, f, ensure_ascii=False, separators=(",", ":")) + + get_logger().info("profiling results written to {}".format(path)) + + +def is_profiling(): + return _running_profiler is not None + + +def _stop_current_profiler(): + global _running_profiler + if _running_profiler is not None: + _running_profiler.stop() + living_profilers = [*_living_profilers] + for profiler in living_profilers: + profiler.dump() -def merge_trace_events(sources: List[str], target: str): - names = list(map(lambda x: x + ".chrome_timeline.json", sources)) - result = [] - for name in names: - with open(name, "r", encoding="utf-8") as f: - content = json.load(f) - for entry in content: - result.append(entry) - with open(target + ".chrome_timeline.json", "w") as f: - json.dump(result, f, ensure_ascii=False, indent=4) +_atexit(_stop_current_profiler) diff --git a/imperative/python/src/tensor.cpp b/imperative/python/src/tensor.cpp index 28357b22657a45504173108b2ebee5be1faad61e..0ed762f5f49cbe63456a279373b10d66cca44198 100644 --- a/imperative/python/src/tensor.cpp +++ b/imperative/python/src/tensor.cpp @@ -13,6 +13,7 @@ #include "megbrain/common.h" #include "megbrain/imperative/ops/utility.h" #include "megbrain/imperative/ops/backward_graph.h" +#include "megbrain/imperative/profiler.h" #include "megbrain/opr/io.h" #include "./tensor.h" @@ -927,9 +928,23 @@ void init_tensor(py::module m) { m.def("pop_scope", [](std::string name) { interpreter_for_py->pop_scope(name); }); m.def("start_profile", - [](std::unordered_map option) { return interpreter_for_py->start_profile(option); }); + [](imperative::Profiler::options_t options) { + interpreter_for_py->sync(); + imperative::Profiler::load_options(std::move(options)); + imperative::Profiler::start_profile(); + interpreter_for_py->start_profile(); + }); m.def("stop_profile", - [](std::string basename, std::string format) { interpreter_for_py->stop_profile(basename, format); }); + []() -> std::function { + interpreter_for_py->stop_profile(); + interpreter_for_py->sync(); + imperative::Profiler::stop_profile(); + auto results = imperative::Profiler::collect(); + auto options = imperative::Profiler::get_options(); + return [results=std::move(results), options=std::move(options)](std::string basename, std::string format){ + imperative::Profiler::dump_profile(basename, format, results, options); + }; + }); m.def("sync", []() { interpreter_for_py->sync(); diff --git a/imperative/python/test/integration/test_profiler.py b/imperative/python/test/integration/test_profiler.py index af8b5c469a85f33c7120da2c08398f4ea0fa8f22..32637d867ad2d7dd633e9e455873a225ddc96d87 100644 --- a/imperative/python/test/integration/test_profiler.py +++ b/imperative/python/test/integration/test_profiler.py @@ -8,6 +8,7 @@ # "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied import json import os +import tempfile import pytest @@ -28,15 +29,18 @@ class Simple(Module): def test_profiler(): - profile_prefix = "pytest_profile" + tempdir = tempfile.NamedTemporaryFile() + profile_prefix = tempdir.name profile_format = "chrome_timeline.json" - profile_path = "{}.{}".format(profile_prefix, profile_format) - with Profiler(profile_prefix, format=profile_format): - with scope("my_scope"): - oup = Simple()(tensor([1.23], dtype="float32")) + profile_path = os.path.join( + profile_prefix, "{}.{}".format(os.getpid(), profile_format) + ) + with option("enable_host_compute", 0): + with Profiler(profile_prefix, format=profile_format): + with scope("my_scope"): + oup = Simple()(tensor([1.23], dtype="float32")) with open(profile_path, "r") as f: events = json.load(f) - os.remove(profile_path) prev_ts = {} scope_count = 0 for event in events: diff --git a/imperative/src/impl/interpreter/commands.h b/imperative/src/impl/interpreter/commands.h index 88db1b065006849b58bbcf3d4a1733ccaa8d4dd8..be51a24bf90eee5dc7eda6b4831cb4d23ade47f1 100644 --- a/imperative/src/impl/interpreter/commands.h +++ b/imperative/src/impl/interpreter/commands.h @@ -13,11 +13,14 @@ #include #include +#include #include "megbrain/tensor.h" #include "megbrain/imperative/op_def.h" #include "megbrain/imperative/utils/to_string.h" +#include "./tensor_info.h" + namespace mgb::imperative { namespace interpreter::intl { @@ -43,7 +46,7 @@ struct Put { }; struct ApplyOp { - uint64_t id; + uint64_t id; //used by profiler to identify unique apply std::shared_ptr op; SmallVector inputs; SmallVector outputs; @@ -143,7 +146,7 @@ struct SetOption { }; struct StartProfile { - InterpreterProfiler* profiler; + std::unordered_set capture_tensors; template void get_props(TFunctor&& functor) const {} @@ -154,14 +157,10 @@ struct StartProfile { }; struct StopProfile { - std::string basename; - std::string format; + std::unordered_set escape_tensors; template - void get_props(TFunctor&& functor) const { - functor("basename", basename); - functor("format", format); - } + void get_props(TFunctor&& functor) const {} const char* get_name() const { return "StopProfile"; diff --git a/imperative/src/impl/interpreter/events.h b/imperative/src/impl/interpreter/events.h deleted file mode 100644 index 48e96a20e91ab1d0d3ab761a18291393beda30af..0000000000000000000000000000000000000000 --- a/imperative/src/impl/interpreter/events.h +++ /dev/null @@ -1,75 +0,0 @@ -/** - * \file imperative/src/impl/interpreter/events.h - * MegEngine is Licensed under the Apache License, Version 2.0 (the "License") - * - * Copyright (c) 2014-2020 Megvii Inc. All rights reserved. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -#pragma once - -#include "./commands.h" -#include "./tensor_info.h" - -namespace mgb::imperative::interpreter::intl { - -#define DEF_EVENT(X, ...) struct X##Event __VA_ARGS__; -#define DEF_DUR_EVENT(X, ...) struct X##Event __VA_ARGS__; struct X##FinishEvent __VA_ARGS__; - -DEF_EVENT(Command, { - IdentifiedCommand icmd; -}); - -DEF_EVENT(CommandEnqueue, :CommandEvent {}); -DEF_EVENT(CommandExecute, :CommandEvent {}); -DEF_EVENT(CommandFinish, :CommandEvent {}); -DEF_DUR_EVENT(OpExecute, { - uint64_t id; - std::shared_ptr op; - SmallVector inputs; - SmallVector outputs; -}); -DEF_DUR_EVENT(KernelExecute, { - uint64_t id; - std::shared_ptr op; - SmallVector inputs; - SmallVector outputs; -}); -DEF_EVENT(TensorDeclare, { - uint64_t tensor_id; -}); -DEF_EVENT(TensorProduce, { - uint64_t tensor_id; - TensorLayout layout; - CompNode device; -}); -DEF_EVENT(TensorErase, { - uint64_t tensor_id; -}); -DEF_EVENT(TensorGetProp, { - uint64_t tensor_id; - TensorInfo::Prop prop; - std::string prop_desc; -}); -DEF_DUR_EVENT(TensorWaitProp, { - uint64_t tensor_id; - TensorInfo::Prop prop; - std::string prop_desc; -}); -DEF_EVENT(TensorNotifyProp, { - uint64_t tensor_id; - TensorInfo::Prop prop; - std::string prop_desc; -}); -DEF_DUR_EVENT(Sync, {}); -DEF_DUR_EVENT(Scope, { - std::string name; -}); -DEF_DUR_EVENT(DeviceScope, { - std::string name; -}); - -} diff --git a/imperative/src/impl/interpreter/interpreter_impl.cpp b/imperative/src/impl/interpreter/interpreter_impl.cpp index 9c43eb133e424e1097977b746f1086d04a4160bc..5b5ec7c217d382532975c1f2033c9721ce0b1d35 100644 --- a/imperative/src/impl/interpreter/interpreter_impl.cpp +++ b/imperative/src/impl/interpreter/interpreter_impl.cpp @@ -20,19 +20,17 @@ #include "megbrain/imperative/ops/opr_attr.h" #include "megbrain/imperative/utils/to_string.h" +#include "../event_pool.h" +#include "../op_trait.h" + using namespace mgb; using namespace imperative; using namespace interpreter; using namespace interpreter::intl; #define RECORD_EVENT(type, ...) \ - if (state.profiler->is_profiling()) { \ - state.profiler->record_host(type{__VA_ARGS__}); \ - } \ - -#define RECORD_DEVICE_EVENT(type, device, ...) \ - if (state.profiler->is_profiling()) { \ - state.profiler->record_device((device), type{__VA_ARGS__}); \ + if (Profiler::is_profiling()) { \ + Profiler::record(type{__VA_ARGS__}); \ } \ @@ -46,6 +44,10 @@ namespace { }; } +namespace mgb { + using namespace profiler; +} + std::thread::id ChannelImpl::get_worker_tid() { return m_worker_state.tid; } @@ -60,6 +62,7 @@ ChannelImpl::WorkerState& ChannelImpl::get_worker_state() { return m_worker_state; } +// Do not use m_xxx_state directly #define m_channel_state #define m_worker_state @@ -74,10 +77,16 @@ Interpreter& Interpreter::inst() { Handle ChannelImpl::put(const HostTensorND& value, bool no_cache) { mgb_assert(check_available(), "Channel already closed"); + auto& state = get_channel_state(); + state.scopes.push("Put"); + auto info = put_impl(value, no_cache); + state.scopes.pop("Put"); + return info; +} + +TensorInfo* ChannelImpl::put_impl(const HostTensorND& value, bool no_cache) { auto info = alloc(); - info->desc.layout = value.layout(); - info->desc.comp_node = value.comp_node(); - info->desc.value = value.proxy_to_default_cpu(); + init(info, {value.layout(), value.comp_node(), value.proxy_to_default_cpu()}); info->h_value = value; m_buffer.enqueue(Put{info, value, no_cache}); if (m_async_level == 0) { @@ -90,11 +99,15 @@ Handle ChannelImpl::put(const HostTensorND& value, bool no_cache) { Handle ChannelImpl::put(const DeviceTensorND& data) { auto& state = get_channel_state(); mgb_assert(check_available(), "Channel already closed"); + state.scopes.push("Put"); auto info = alloc(); - info->desc.layout = data.layout(); - info->desc.comp_node = data.comp_node(); + RECORD_EVENT(TensorCommandEvent, info->id, TensorCommandEvent::Put); + init(info, {data.layout(), data.comp_node()}); info->ptr = Tensor::make(data); - RECORD_EVENT(TensorProduceEvent, info->id, info->desc.layout, info->desc.comp_node); + RECORD_EVENT(TensorProduceEvent, info->id, info->desc.layout, info->desc.comp_node, data.raw_ptr()); + info->status = TensorInfo::Produced; + RECORD_EVENT(TensorCommandFinishEvent, info->id, TensorCommandFinishEvent::Put); + state.scopes.pop("Put"); return info; } @@ -148,7 +161,7 @@ void ChannelImpl::dispatch_default_cpu( SmallVector* outputs) { auto& state = get_channel_state(); auto [output_descs, validated] = OpDef::infer_output_attrs_fallible(*op, input_descs); - MGB_MARK_USED_VAR(validated); + RECORD_EVENT(ShapeInferEvent, validated); SmallVector input_tensornds; input_tensornds.reserve(input_descs.size()); @@ -166,6 +179,7 @@ void ChannelImpl::dispatch_default_cpu( if (info->ptr && info->ptr->try_get_value()) { input_tensornds.emplace_back(info->ptr->get_value().proxy_to_default_cpu()); } else { + // It's OK for SwapOut. We assign h_value before drop ptr mgb_assert(!info->h_value.empty(), "inp->h_value is empty!"); input_tensornds.emplace_back(info->h_value.proxy_to_default_cpu()); } @@ -182,8 +196,7 @@ void ChannelImpl::dispatch_default_cpu( output_tensornds.emplace_back(HostTensorND(output_cn, desc.layout).proxy_to_default_cpu()); } - auto apply_id = ++m_last_id; - RECORD_EVENT(OpExecuteEvent, apply_id, op, tinfo_to_tid(input_infos), {}); + uint64_t op_id = Profiler::next_id(); OpDef::apply_on_device_tensornd(*op, input_tensornds, &output_tensornds); @@ -193,14 +206,20 @@ void ChannelImpl::dispatch_default_cpu( HostTensorND host_tensornd = HostTensorND::make_proxy(tensornd) .proxy_to_comp_node(output_cn); // use `put` for consistency - auto info = reinterpret_cast(put(host_tensornd, false)); + auto info = reinterpret_cast(put_impl(host_tensornd, false)); mgb_assert(info->desc.layout.ndim != 0); output_infos.push_back(info); outputs->push_back(info); } - - RECORD_EVENT(OpExecuteFinishEvent, apply_id, op, - tinfo_to_tid(input_infos), tinfo_to_tid(output_infos)); + auto op_info_getter = [op]{ + std::unordered_map op_info; + auto props = OpDef::props(*op); + for (auto&& [key, value]: props) { + op_info[key] = value; + } + return op_info; + }; + RECORD_EVENT(OpDispatchEvent, op_id, op->trait()->name, op_info_getter, tinfo_to_tid(input_infos), tinfo_to_tid(output_infos)); } void ChannelImpl::dispatch_kernel( @@ -209,15 +228,22 @@ void ChannelImpl::dispatch_kernel( const SmallVector& input_descs, SmallVector* outputs) { auto& state = get_channel_state(); + auto& options = state.options; + + auto name = op->trait()->make_name(*op); + state.scopes.push(name); + auto [output_descs, validated] = OpDef::infer_output_attrs_fallible(*op, input_descs); + RECORD_EVENT(ShapeInferEvent, validated); - ApplyOp cmd{++m_last_id, std::move(op)}; + ApplyOp cmd{Profiler::next_id(), std::move(op)}; cmd.inputs = std::move(input_infos); cmd.outputs.reserve(output_descs.size()); outputs->reserve(output_descs.size()); - for (auto&& desc : output_descs) { + for (int i = 0; i < output_descs.size(); ++i) { + auto&& desc = output_descs[i]; auto info = alloc(); - info->desc = desc; + init(info, desc); // make sure desc's value is consistent with h_value if (!info->desc.value.empty()) { info->h_value = HostTensorND::make_proxy(desc.value) @@ -226,10 +252,19 @@ void ChannelImpl::dispatch_kernel( cmd.outputs.push_back(info); outputs->push_back(info); } + auto op_info_getter = [op=cmd.op]{ + std::unordered_map op_info; + auto props = OpDef::props(*op); + for (auto&& [key, value]: props) { + op_info[key] = value; + } + return op_info; + }; + RECORD_EVENT(OpDispatchEvent, cmd.id, cmd.op->trait()->name, op_info_getter, tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs)); m_buffer.enqueue(std::move(cmd)); - if (!validated && state.options.async_level == 1) { + if (!validated && options.async_level == 1) { sync(); - } else if (state.options.async_level == 0) { + } else if (options.async_level == 0) { sync(); // check device error for (auto&& oup : *outputs) { @@ -237,6 +272,7 @@ void ChannelImpl::dispatch_kernel( info->ptr->comp_node().sync(); } } + state.scopes.pop(name); } SmallVector ChannelImpl::apply_op( @@ -282,31 +318,12 @@ SmallVector ChannelImpl::apply_op( HostTensorND ChannelImpl::get_value(Handle handle) { mgb_assert(check_available(), "Channel already closed"); auto& state = get_channel_state(); - // TODO: maybe get_value should be done on host. i.e. delete GetValue mgb_assert(m_valid_handle.find(handle) != m_valid_handle.end(), "invalid handle: %p", handle); auto info = reinterpret_cast(handle); - mgb_assert(!m_waitee); // donnot use info->value_fetched, it's unsafe mgb_assert(!info->invalid, "Invalid tensor, unable to get_value!"); - std::unique_lock lock(m_mutex); - TensorPtr tensor_ptr = info->ptr; - auto value_fetched = [&]() { - return tensor_ptr && tensor_ptr->value_fetched(); - }; - if (!value_fetched()) { - m_waitee = info; - m_buffer.enqueue(GetValue{info}); - RECORD_EVENT(TensorWaitPropEvent, info->id, TensorInfo::HostValue); - m_cv.wait(lock, [&]() { - check_worker_exc_unsafe(); - tensor_ptr = info->ptr; - return value_fetched(); - }); - RECORD_EVENT(TensorWaitPropFinishEvent, info->id, TensorInfo::HostValue); - m_waitee = nullptr; - } - return tensor_ptr->get_value(); + return wait_tensor(info, TensorProp::HostValue)->get_value(); } TensorShape ChannelImpl::get_shape(Handle handle) { @@ -318,18 +335,7 @@ TensorShape ChannelImpl::get_shape(Handle handle) { if (info->desc.layout.ndim != 0) { return info->desc.layout; } - std::unique_lock lock(m_mutex); - mgb_assert(!m_waitee); - m_waitee = info; - m_buffer.flush(); - RECORD_EVENT(TensorWaitPropEvent, info->id, TensorInfo::Shape); - m_cv.wait(lock, [&]() { - check_worker_exc_unsafe(); - return static_cast(info->ptr); - }); - RECORD_EVENT(TensorWaitPropFinishEvent, info->id, TensorInfo::Shape); - m_waitee = nullptr; - TensorShape ret = info->ptr->layout(); + TensorShape ret = wait_tensor(info, TensorProp::Shape)->layout(); mgb_assert(ret.ndim != 0); return ret; } @@ -340,7 +346,7 @@ DType ChannelImpl::get_dtype(Handle handle) { mgb_assert(m_valid_handle.find(handle) != m_valid_handle.end(), "invalid handle: %p", handle); auto info = reinterpret_cast(handle); - RECORD_EVENT(TensorGetPropEvent, info->id, TensorInfo::DType); + RECORD_EVENT(TensorGetPropEvent, info->id, TensorProp::DType); auto ret = info->desc.layout.dtype; mgb_assert(ret.valid()); return ret; @@ -352,7 +358,7 @@ CompNode ChannelImpl::get_device(Handle handle) { mgb_assert(m_valid_handle.find(handle) != m_valid_handle.end(), "invalid handle: %p", handle); auto info = reinterpret_cast(handle); - RECORD_EVENT(TensorGetPropEvent, info->id, TensorInfo::Device); + RECORD_EVENT(TensorGetPropEvent, info->id, TensorProp::Device); auto ret = info->desc.comp_node; mgb_assert(ret.valid()); return ret; @@ -364,28 +370,14 @@ DeviceTensorND ChannelImpl::get_dev_tensor(Handle handle) { mgb_assert(m_valid_handle.find(handle) != m_valid_handle.end(), "invalid handle: %p", handle); auto info = reinterpret_cast(handle); - std::unique_lock lock(m_mutex); - mgb_assert(!m_waitee); - m_waitee = info; - m_buffer.flush(); - RECORD_EVENT(TensorWaitPropEvent, info->id, TensorInfo::DevValue); - m_cv.wait(lock, [&]() { - check_worker_exc_unsafe(); - return static_cast(info->ptr); - }); - RECORD_EVENT(TensorWaitPropFinishEvent, info->id, TensorInfo::DevValue); - m_waitee = nullptr; - return info->ptr->dev_tensor(); + return wait_tensor(info, TensorProp::DevValue)->dev_tensor(); } void ChannelImpl::sync() { mgb_assert(check_available(), "Channel already closed"); auto& state = get_channel_state(); m_buffer.flush(); - RECORD_EVENT(SyncEvent); m_worker.wait_all_task_finish(); - CompNode::sync_all(); - RECORD_EVENT(SyncFinishEvent); MGB_LOCK_GUARD(m_mutex); check_worker_exc_unsafe(); } @@ -419,14 +411,24 @@ void ChannelImpl::set_option(std::string name, size_t value) { TensorInfo* ChannelImpl::alloc() { auto& state = get_channel_state(); - MGB_LOCK_GUARD(m_mutex); - auto info = m_pool.alloc(); - m_valid_handle.insert(info); - info->id = m_last_id++; - RECORD_EVENT(TensorDeclareEvent, info->id); + auto info = [this]{ + MGB_LOCK_GUARD(m_mutex); + return m_pool.alloc(); + }(); + info->id = Profiler::next_id(); + if (Profiler::is_profiling()) { + info->name = state.scopes.next_tensor_name(); + } return info; } +void ChannelImpl::init(TensorInfo* info, LogicalTensorDesc desc) { + m_valid_handle.insert(info); + RECORD_EVENT(TensorDeclareEvent, info->id, info->name); + info->status = TensorInfo::Allocated; + info->desc = std::move(desc); +} + void ChannelImpl::do_drop(TensorInfo* ptr, bool user=false) { if (!ptr->producer) { @@ -439,6 +441,7 @@ void ChannelImpl::do_drop(TensorInfo* ptr, bool user=false) { return; } ptr->evict_type = EvictType::DROP; + ptr->status = TensorInfo::Dropped; release_tensor(ptr); } @@ -460,7 +463,8 @@ void ChannelImpl::free(TensorInfo* ptr) { } void ChannelImpl::recursive_free(TensorInfo* ptr) { - SmallVector inps(0); + RECORD_EVENT(TensorCommandEvent, ptr->id, TensorCommandEvent::RecFree); + SmallVector inps; if (ptr->producer) { for (auto i : ptr->producer->inputs) { if (i && --i->ref_cnt == 0) { @@ -474,17 +478,23 @@ void ChannelImpl::recursive_free(TensorInfo* ptr) { recursive_free(i); } } + RECORD_EVENT(TensorCommandFinishEvent, ptr->id, TensorCommandFinishEvent::RecFree); } void ChannelImpl::real_free(TensorInfo* ptr) { auto& state = get_worker_state(); MGB_LOCK_GUARD(m_mutex); - RECORD_EVENT(TensorEraseEvent, ptr->id); if (ptr->size_exceeds_thd(state.options.dtr_evictee_minimum_size)) { m_dtr.erase_candidate(ptr); } detach_users(ptr); ptr->detach_producer(); + bool has_value = ptr->ptr != nullptr; + if (has_value) { + RECORD_EVENT(TensorReleaseEvent, ptr->id); + } + RECORD_EVENT(TensorEraseEvent, ptr->id, ptr->ptr_use_count); + ptr->status = TensorInfo::Deleted; m_pool.free(ptr); } @@ -496,46 +506,48 @@ ChannelImpl::~ChannelImpl() { void ChannelImpl::produce_tensor(TensorInfo* dest, TensorPtr ptr, bool notice=true) { auto& state = get_worker_state(); - auto lock = std::unique_lock(m_mutex, std::defer_lock); + std::unique_lock lock{m_mutex, std::defer_lock}; if (notice) { lock.lock(); } m_dtr.update_used_time(dest); - if (notice) { - RECORD_EVENT(TensorProduceEvent, dest->id, ptr->layout(), ptr->comp_node()); - } - dest->value_fetched = ptr->value_fetched(); + RECORD_EVENT(TensorProduceEvent, dest->id, ptr->layout(), ptr->comp_node(), ptr->dev_tensor().raw_ptr()); // update tensor desc for static infer dest->desc.layout = ptr->layout(); dest->desc.comp_node = ptr->comp_node(); dest->memory = ptr->blob()->size(); dest->ptr = std::move(ptr); dest->evict_type = EvictType::NONE; + dest->status = TensorInfo::Produced; if (notice && dest->size_exceeds_thd(state.options.dtr_evictee_minimum_size)) { m_dtr.insert_candidate(dest); } - if (notice && m_waitee == dest) { - m_cv.notify_all(); + if (notice) { + notify_tensor_unsafe(dest); } } void ChannelImpl::release_tensor(TensorInfo* dest) { + RECORD_EVENT(TensorReleaseEvent, dest->id); MGB_LOCK_GUARD(m_mutex); dest->ptr.reset(); } void ChannelImpl::regenerate(TensorInfo* dest) { + RECORD_EVENT(TensorCommandEvent, dest->id, TensorCommandEvent::ReGen); if (dest->evict_type == EvictType::DROP) { recompute(dest->producer); } else if (dest->evict_type == EvictType::SWAP) { produce_tensor(dest, Tensor::make(dest->h_value)); } + RECORD_EVENT(TensorCommandFinishEvent, dest->id, TensorCommandFinishEvent::ReGen); } void ChannelImpl::do_apply_op(const ApplyOp& cmd) { using namespace ranges; using namespace ranges::views; auto& state = get_worker_state(); + bool profiling_device = Profiler::is_profiling() && Profiler::get_option("profile_device", 0); uint64_t apply_id = cmd.id; SmallVector tensor_inputs; if (state.options.enable_dtr_auto_drop) { @@ -545,33 +557,50 @@ void ChannelImpl::do_apply_op(const ApplyOp& cmd) { if (!i->ptr && i->evict_type != EvictType::NONE) { regenerate(i); } - // inputs.push_back(i->ptr); m_dtr.update_used_time(i); } tensor_inputs.reserve(cmd.inputs.size()); // refcnt == 1, owners: [TensorInfo::ptr] for (auto i : cmd.inputs) { mgb_assert(i->ptr, "Invalid input tensor ptr!"); + // refcnt ++, owners: [i->ptr, tensor_inputs] tensor_inputs.push_back(i->ptr); } + RECORD_EVENT(OpExecuteEvent, apply_id); // Begin profiling operator - SmallVector devices; - if (state.profiler->is_profiling()) { + SmallVector> kernels; + if (profiling_device) { + // Collecting devices + SmallVector devices; for (auto&& i : concat(cmd.inputs, cmd.outputs)) { if (i != nullptr && count(devices, i->desc.comp_node) == 0) { devices.push_back(i->desc.comp_node); + kernels.push_back({i->desc.comp_node, Profiler::next_id()}); } } } + for (auto* input: cmd.inputs) { + auto input_id = input->id; + RECORD_EVENT(OpInputEvent, input_id); + RECORD_EVENT(TensorUsageEvent, input_id); + RECORD_EVENT(OpInputFinishEvent, input_id); + } + // Fused by command buffer. @see: CommandBuffer::fuse_del + // Now if dest is inplacable, it's refcnt would be decreased to 1 and owned by tensor_inputs after Del. + // Note for exprs like 'y = x op x', inplace is unsupported yet but Del would be also fused. for (auto* del : cmd.dels) { + // refcnt --, owners: [tensor_inputs] + // if it's decreased to 1, would be detected at @see: proxy_graph_detail::apply_on_physical_tensor + uint64_t del_id = del->id; + RECORD_EVENT(OpDelEvent, del_id); free(del); + RECORD_EVENT(OpDelFinishEvent, del_id); } - RECORD_EVENT(OpExecuteEvent, apply_id, cmd.op, - tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs)); - for (auto&& device: devices) { - sync_device_scope(device); - RECORD_DEVICE_EVENT(KernelExecuteEvent, device, apply_id, cmd.op, - tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs)); + // Before wait + //TODO: split operator wait and execute so that OpWait could be corrected recorded. + // Before execute + for (auto&& [device, kernel_id]: kernels) { + RECORD_EVENT(KernelExecuteEvent, apply_id, kernel_id, Timer::record_event(device)); } if (state.options.enable_dtr_auto_drop && state.options.dtr_eviction_threshold > 0) { auto_evict(); @@ -579,20 +608,26 @@ void ChannelImpl::do_apply_op(const ApplyOp& cmd) { // Apply op // Here std::move is REQUIRED for removing duplicated references. auto tensor_outputs = OpDef::apply_on_physical_tensor( - *cmd.op, tensor_inputs); + *cmd.op, std::move(tensor_inputs)); // After execute - for (auto&& device : devices) { - RECORD_DEVICE_EVENT(KernelExecuteFinishEvent, device, apply_id, cmd.op, - tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs)); + for (auto&& [device, kernel_id]: kernels) { + RECORD_EVENT(KernelExecuteFinishEvent, apply_id, kernel_id, Timer::record_event(device)); } - RECORD_EVENT(OpExecuteFinishEvent, apply_id, cmd.op, - tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs)); // End profiling operator mgb_assert(tensor_outputs.size() == cmd.outputs.size()); for (size_t i = 0; i < tensor_outputs.size(); ++i) { auto output = cmd.outputs[i]; - if (output != nullptr && output->ptr == nullptr) { + if (output == nullptr) { + RECORD_EVENT(OpOutputEvent, 0); + RECORD_EVENT(OpOutputFinishEvent, 0); + } else if (output->ptr != nullptr) { + RECORD_EVENT(OpOutputEvent, output->id); + RECORD_EVENT(OpOutputFinishEvent, output->id); + } else { + RECORD_EVENT(OpOutputEvent, output->id); produce_tensor(output, tensor_outputs[i]); + RECORD_EVENT(OpOutputFinishEvent, output->id); + sample_on_device(output->desc.comp_node, false); } } @@ -612,6 +647,8 @@ void ChannelImpl::do_apply_op(const ApplyOp& cmd) { } m_dtr.unpin(cmd.inputs); } + RECORD_EVENT(OpExecuteFinishEvent, apply_id); + // End profiling operator } void ChannelImpl::recompute(TensorInfo::ComputePath* path) { @@ -637,6 +674,7 @@ void ChannelImpl::auto_evict() { } size_t current_memory = m_dtr.comp_node.get_used_memory(); while (current_memory > state.options.dtr_eviction_threshold) { + sample_on_device(m_dtr.comp_node, false); auto best = m_dtr.find_best_tensor(); if (!best) { if (!m_dtr.warn_printed) { @@ -656,6 +694,7 @@ void ChannelImpl::auto_evict() { if (best->evict_type == EvictType::DROP) { m_dtr.update_dsu_after_evict(best); } + sample_on_device(m_dtr.comp_node, false); } } @@ -665,6 +704,10 @@ void ChannelImpl::detach_users(TensorInfo* dest) { SmallVector outputs = user->outputs; SmallVector inputs = user->inputs; for (auto* output: outputs) { + // When a `ComputePath` is detach from it's input, + // there is no need to reserve it, + // so we detach all output of this path + // to decrease it's `ref_cnt` to zero. if (output == nullptr) { continue; } @@ -674,63 +717,79 @@ void ChannelImpl::detach_users(TensorInfo* dest) { input->ref_cnt --; } } + // now user is dead } - mgb_assert(dest->users.size() == 0); - //dest->users.clear(); + mgb_assert(dest->users.empty(), "ComputePath leaking"); } bool ChannelImpl::check_available() { return !m_closed; } -void ChannelImpl::sync_device_scope(CompNode device) { - auto& state = get_worker_state(); - auto& prev = state.device_scope_map[device]; - auto& current = state.scopes; - auto push_scope = [&](std::string name) { - RECORD_DEVICE_EVENT(DeviceScopeEvent, device, name); - }; - auto pop_scope = [&](std::string name) { - RECORD_DEVICE_EVENT(DeviceScopeFinishEvent, device, name); - }; - size_t similarity = 0; - for (size_t i = 0; i < prev.size() && i < current.size(); i++) { - if (prev[i] == current[i]) { - similarity++; +TensorPtr ChannelImpl::wait_tensor(TensorInfo* info, TensorProp prop) { + m_buffer.flush(); + std::unique_lock lock(m_mutex); + mgb_assert(!m_waitee, "duplicate waitee"); + m_waitee = info; + m_waitee_id = Profiler::next_id(); + RECORD_EVENT(TensorWaitPropEvent, info->id, m_waitee_id, prop); + bool require_host = prop == TensorProp::HostValue; + bool value_fetching = false; + m_cv.wait(lock, [&]() { + check_worker_exc_unsafe(); + if (require_host) { + if (info->ptr && info->ptr->value_fetched()) { + return true; + } + if (!value_fetching) { + m_buffer.enqueue(GetValue{info}); + value_fetching = true; + } + return false; } else { - break; + return static_cast(info->ptr); } + }); + RECORD_EVENT(TensorWaitPropFinishEvent, info->id, m_waitee_id, prop, m_waitee == nullptr); + if (m_waitee != nullptr) { + mgb_assert(m_waitee == info, "waitee mismatch"); + m_waitee = nullptr; } - while (prev.size() > similarity) { - pop_scope(prev.back()); - prev.pop_back(); + return info->ptr; +} + +void ChannelImpl::notify_tensor_unsafe(TensorInfo* info) { + if (info == m_waitee) { + m_waitee = nullptr; + RECORD_EVENT(TensorNotifyPropEvent, info->id); + m_cv.notify_all(); } - while (prev.size() < current.size()) { - prev.push_back(current[prev.size()]); - push_scope(prev.back()); +} + +std::unordered_set ChannelImpl::collect_valid_tensors() { + std::unordered_set valid_tensors; + for (auto* handle: m_valid_handle) { + auto* info = reinterpret_cast(handle); + valid_tensors.insert(info); + //TODO: valid_tensors.insert({info, info->status}); } + return valid_tensors; } void ChannelImpl::process_one_task(IdentifiedCommand& icmd) { using namespace ranges; using namespace ranges::views; - auto& state = get_worker_state(); - RECORD_EVENT(CommandExecuteEvent, icmd); - bool finished = false; - auto do_finish_command = [&]{ - if (finished) { - return; - } - RECORD_EVENT(CommandFinishEvent, icmd); - finished = true; - }; + auto& options = state.options; //TODO: remove std::visit for support osx 10.12 auto cmd_visitor = [&](const auto& cmd) { using T = std::decay_t; if constexpr (std::is_same_v) { + RECORD_EVENT(TensorCommandEvent, cmd.dest->id, TensorCommandEvent::Put); auto value = cmd.no_cache ? std::make_shared(cmd.value) : Tensor::make(cmd.value); produce_tensor(cmd.dest, std::move(value)); + RECORD_EVENT(TensorCommandFinishEvent, cmd.dest->id, TensorCommandFinishEvent::Put); + sample_on_device(cmd.dest->desc.comp_node, false); } else if constexpr (std::is_same_v) { do_apply_op(cmd); for (size_t i = 0; i < cmd.outputs.size(); ++i) { @@ -739,7 +798,7 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) { continue; } if (state.options.enable_dtr_auto_drop) { - cmd.outputs[i]->dsu_ptr = std::make_shared(output->compute_time); + output->dsu_ptr = std::make_shared(output->compute_time); } } if (state.options.enable_drop && state.options.record_computing_path) { @@ -765,6 +824,7 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) { bool cross_cn = any_of(concat(cmd.inputs, cmd.outputs), is_cross_cn); bool inplace = any_of(cartesian_product(cmd.inputs, cmd.outputs), is_inplace); + if (!inplace && !cross_cn && !m_dtr.is_bad_op(get_name(*cmd.op))) { TensorInfo::ComputePath::make(cmd.id, cmd.op, cmd.inputs, cmd.outputs); size_t detach_cnt = 0; @@ -780,7 +840,12 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) { } } } else if constexpr (std::is_same_v) { + RECORD_EVENT(TensorCommandEvent, cmd.dest->id, TensorCommandEvent::Del); + CompNode device = cmd.dest->desc.comp_node; + uint64_t tensor_id = cmd.dest->id; free(cmd.dest); + RECORD_EVENT(TensorCommandFinishEvent, tensor_id, TensorCommandFinishEvent::Del); + sample_on_device(device, false); } else if constexpr (std::is_same_v) { if (!cmd.dest->ptr && cmd.dest->evict_type != EvictType::NONE) { regenerate(cmd.dest); @@ -788,50 +853,62 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) { mgb_assert(cmd.dest->ptr, "Invalid tensor ptr!"); cmd.dest->ptr->fetch_value(); MGB_LOCK_GUARD(m_mutex); - cmd.dest->value_fetched = true; - if (m_waitee == cmd.dest) { - m_cv.notify_all(); - } + notify_tensor_unsafe(cmd.dest); } else if constexpr (std::is_same_v) { + RECORD_EVENT(TensorCommandEvent, cmd.dest->id, TensorCommandEvent::SwapIn); produce_tensor(cmd.dest, Tensor::make(cmd.dest->h_value)); + RECORD_EVENT(TensorCommandFinishEvent, cmd.dest->id, TensorCommandFinishEvent::SwapIn); + sample_on_device(cmd.dest->desc.comp_node, false); } else if constexpr (std::is_same_v) { + RECORD_EVENT(TensorCommandEvent, cmd.dest->id, TensorCommandEvent::SwapOut); cmd.dest->h_value = cmd.dest->ptr->get_value(); if (cmd.dest->evict_type == EvictType::NONE) { - release_tensor(cmd.dest); cmd.dest->evict_type = EvictType::SWAP; + cmd.dest->status = TensorInfo::Swapped; + release_tensor(cmd.dest); } + RECORD_EVENT(TensorCommandFinishEvent, cmd.dest->id, TensorCommandFinishEvent::SwapOut); + sample_on_device(cmd.dest->desc.comp_node, false); } else if constexpr (std::is_same_v) { + RECORD_EVENT(TensorCommandEvent, cmd.dest->id, TensorCommandEvent::Drop); do_drop(cmd.dest, true); + RECORD_EVENT(TensorCommandFinishEvent, cmd.dest->id, TensorCommandFinishEvent::Drop); } else if constexpr (std::is_same_v) { - state.options.set_option(cmd.key, cmd.value); + options.set_option(cmd.key, cmd.value); } else if constexpr (std::is_same_v) { + RECORD_EVENT(StartProfileEvent); CompNode::sync_all(); - state.profiler.reset(cmd.profiler); + for (auto* info: cmd.capture_tensors) { + RECORD_EVENT(TensorDeclareEvent, info->id, info->name); + if (info->status == TensorInfo::Produced) { + // TODO: handle swap/drop + RECORD_EVENT(TensorProduceEvent, info->id, info->desc.layout, info->desc.comp_node, info->ptr->dev_tensor().raw_ptr()); + } + } + CompNode::foreach([&](CompNode device){ + if (Profiler::get_option("sample_rate", 0)) { + sample_on_device(device, true); + } + }); + RECORD_EVENT(StartProfileFinishEvent); } else if constexpr (std::is_same_v) { - for (auto&& [device, scopes]: state.device_scope_map) { - MGB_MARK_USED_VAR(scopes); - sync_device_scope(device); + RECORD_EVENT(StopProfileEvent); + for (auto* info: cmd.escape_tensors) { + bool has_value = info->status == TensorInfo::Produced; + if (has_value) { + RECORD_EVENT(TensorReleaseEvent, info->id); + } + RECORD_EVENT(TensorEraseEvent, info->id); } - do_finish_command(); - auto profiler = std::make_unique(); - std::swap(profiler, state.profiler); - auto records = profiler->stop(); - auto worker_tid = get_worker_tid(); - auto host_map = [worker_tid](std::thread::id tid) { - if (tid == worker_tid) { - return "worker"; - } else { - return "unknown"; + CompNode::foreach([&](CompNode device){ + if (Profiler::get_option("sample_rate", 0)) { + sample_on_device(device, true); } - }; + }); + RECORD_EVENT(StopProfileFinishEvent); } else if constexpr (std::is_same_v) { - state.scopes.push_back(cmd.scope_name); - do_finish_command(); RECORD_EVENT(ScopeEvent, cmd.scope_name); } else if constexpr (std::is_same_v) { - mgb_assert(state.scopes.back() == cmd.scope_name, "scope name mismatch"); - state.scopes.pop_back(); - do_finish_command(); RECORD_EVENT(ScopeFinishEvent, cmd.scope_name); } else { static_assert(!std::is_same_v); @@ -839,7 +916,7 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) { }; std::visit([&](const auto& cmd){ using T = std::decay_t; - if (!state.options.catch_worker_execption) { + if (!options.catch_worker_execption) { cmd_visitor(cmd); return; } @@ -855,10 +932,12 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) { cmd.dest->invalid = true; } m_worker_exc = std::current_exception(); - m_cv.notify_all(); + RECORD_EVENT(WorkerExceptionEvent); + if (m_waitee) { + notify_tensor_unsafe(m_waitee); + } } }, icmd.second); - do_finish_command(); } void ChannelImpl::check_worker_exc_unsafe() { @@ -888,17 +967,17 @@ void ChannelImpl::CommandBuffer::flush() { void ChannelImpl::CommandBuffer::flush(Handle pos) { auto& state = m_owner->get_channel_state(); for (auto iter = m_commands.begin(); iter != pos; ++iter) { - // mgb_log_debug("%s Flushed", to_string(*iter).c_str()); - IdentifiedCommand icmd{++m_owner->m_last_id, std::move(*iter)}; - RECORD_EVENT(CommandEnqueueEvent, icmd); - m_owner->m_worker.add_task(std::move(icmd)); + if (Profiler::is_profiling()) { + mgb_log_debug("%s Flushed", to_string(*iter).c_str()); + } + m_owner->m_worker.add_task(IdentifiedCommand{Profiler::next_id(), std::move(*iter)}); } m_commands.erase(m_commands.begin(), pos); } auto ChannelImpl::CommandBuffer::flush_pos_for(const Command& cmd) -> Handle { auto& state = m_owner->get_channel_state(); - return std::visit([&, this](const auto& cmd) { + return std::visit([this, &state](const auto& cmd) { using T = std::decay_t; if constexpr (std::is_same_v) { auto* op_type = cmd.op->dyn_typeinfo(); @@ -986,46 +1065,37 @@ auto ChannelImpl::CommandBuffer::find_produce(TensorInfo* dest, Range range) }); } -void ChannelImpl::start_profile(std::unordered_map option) { +void ChannelImpl::start_profile() { mgb_assert(check_available(), "Channel already closed"); - auto& state = get_channel_state(); - auto profiler_option = InterpreterProfiler::Option::from_dict(option); - auto profiler = std::make_unique(); - profiler->set_option(profiler_option); - profiler->start(InterpreterProfiler::topic_to_mask(profiler_option.topic)); - std::swap(profiler, state.profiler); - m_buffer.enqueue(StartProfile{state.profiler.get()}); + auto capture_tensors = collect_valid_tensors(); + if (capture_tensors.size() > 0) { + m_buffer.enqueue(StartProfile{std::move(capture_tensors)}); + } } -void ChannelImpl::stop_profile(std::string basename, std::string format) { +void ChannelImpl::stop_profile() { mgb_assert(check_available(), "Channel already closed"); - auto& state = get_channel_state(); m_buffer.flush(); - auto profiler = std::make_unique(); - std::swap(profiler, state.profiler); - profiler.release(); - m_buffer.enqueue(StopProfile{basename, format}); + auto escape_tensors = collect_valid_tensors(); + if (escape_tensors.size() > 0) { + m_buffer.enqueue(StopProfile{std::move(escape_tensors)}); + } } void ChannelImpl::push_scope(std::string name) { mgb_assert(check_available(), "Channel already closed"); auto& state = get_channel_state(); + state.scopes.push(name); RECORD_EVENT(ScopeEvent, name); - if (state.profiler->is_profiling()) { - state.scopes.push_back(name); - m_buffer.enqueue(PushScope{name}); - } + m_buffer.enqueue(PushScope{name}); } void ChannelImpl::pop_scope(std::string name) { mgb_assert(check_available(), "Channel already closed"); auto& state = get_channel_state(); + state.scopes.pop(name); RECORD_EVENT(ScopeFinishEvent, name); - if (state.profiler->is_profiling()) { - mgb_assert((!state.scopes.empty()) && state.scopes.back() == name, "scope name mismatch"); - state.scopes.pop_back(); - m_buffer.enqueue(PopScope{name}); - } + m_buffer.enqueue(PopScope{name}); } void ChannelImpl::assert_in_channel() { @@ -1036,6 +1106,19 @@ void ChannelImpl::assert_in_worker() { mgb_assert(get_worker_tid() == std::this_thread::get_id(), "this method can only be called in worker thread"); } +void ChannelImpl::sample_on_device(CompNode device, bool force) { + if (!force) { + thread_local int last_sample_id = 0; + int sample_rate = Profiler::is_profiling() ? Profiler::get_option("sample_rate", 0) : 0; + if (!sample_rate || ((++last_sample_id) % sample_rate != 0)) { + return; + } + } + RECORD_EVENT(SampleDeviceEvent, device); + auto [total, free] = device.get_mem_status_bytes(); + RECORD_EVENT(SampleDeviceFinishEvent, device, total, free); +} + void ChannelImpl::DynamicSublinear::pin(const SmallVector& vec) { for (auto i : vec) { i->pin(); diff --git a/imperative/src/impl/interpreter/interpreter_impl.h b/imperative/src/impl/interpreter/interpreter_impl.h index b459f54f96c416b3a03df2cf320f0ec8b5c3564b..dec584d147b28d0ea83e9a0879b839497d54bee7 100644 --- a/imperative/src/impl/interpreter/interpreter_impl.h +++ b/imperative/src/impl/interpreter/interpreter_impl.h @@ -24,10 +24,10 @@ #include "megbrain/imperative/profiler.h" #include "./commands.h" -#include "./events.h" #include "./tensor_info.h" #include "./option_manager.h" -#include "./profiler.h" + +#include "../profiler/events.h" namespace mgb::imperative::interpreter::intl { @@ -37,7 +37,6 @@ struct InterpreterImpl : Interpreter { std::unique_ptr create_channel() override; }; - struct ChannelImpl : Interpreter::Channel { ChannelImpl(); ~ChannelImpl() override; @@ -67,19 +66,27 @@ struct ChannelImpl : Interpreter::Channel { size_t get_option(std::string name) override; void set_option(std::string name, size_t value) override; - void start_profile(std::unordered_map option) override; - void stop_profile(std::string basename, std::string format) override; + void start_profile() override; + void stop_profile() override; void push_scope(std::string) override; void pop_scope(std::string) override; private: + struct WorkQueue; + struct State; + TensorInfo* alloc(); + void init(TensorInfo*, LogicalTensorDesc desc); void free(TensorInfo*); void real_free(TensorInfo*); void recursive_free(TensorInfo*); void do_drop(TensorInfo*, bool); void detach_users(TensorInfo*); + TensorInfo* put_impl(const HostTensorND& value, bool no_cache); + TensorPtr wait_tensor(TensorInfo* info, profiler::TensorProp prop); + void notify_tensor_unsafe(TensorInfo* info); + void process_one_task(IdentifiedCommand&); void check_worker_exc_unsafe(); @@ -105,24 +112,31 @@ private: bool check_available(); + void push_scope(std::string, State&); + void pop_scope(std::string, State&); + void assert_in_channel(); void assert_in_worker(); std::thread::id get_worker_tid(); - void sync_device_scope(CompNode device); - template void enqueue_command(TCommand&& cmd) { m_buffer.enqueue(Command{std::forward(cmd)}); } + void sample_on_device(CompNode device, bool force); + + // valid => status != Deleted + std::unordered_set collect_valid_tensors(); + std::mutex m_mutex; std::condition_variable m_cv; MemPool m_pool; std::unordered_set m_valid_handle; TensorInfo* m_waitee = nullptr; + uint64_t m_waitee_id = 0; std::exception_ptr m_worker_exc; - std::atomic_uint64_t m_last_id = 0; + std::function m_profile_dump_callback; bool m_closed = false; @@ -191,27 +205,98 @@ private: //! level 0: both sync. int m_async_level = 2; - struct State { - OptionManager options; - std::vector scopes; - std::unique_ptr profiler; + struct Scope { + std::string name; + std::unordered_map> children; + size_t version = 0; + size_t parent_version = 0; + size_t tensor_count = 0; + Scope* active_child = nullptr; + Scope* parent = nullptr; + + Scope* enter(std::string name) { + auto& child = children[name]; + if (!child) { + child = std::make_unique(); + child->name = name; + child->parent = this; + } + if (version != child->parent_version) { + child->version = 0; + child->parent_version = version; + } else { + child->version++; + } + child->tensor_count = 0; + return active_child = child.get(); + } - State() { - profiler = std::make_unique(); + Scope* exit(std::string name) { + mgb_assert(this->name == name, "scope name mismatch"); + parent->active_child = nullptr; + return parent; } }; - struct ChannelState: State {}; + class ScopeManager { + private: + Scope m_root; + Scope* m_current_scope = &m_root; + public: + class ScopeGuard{ + private: + ScopeManager* m_manager; + std::string m_name; + public: + ScopeGuard(ScopeManager* manager, std::string name): m_manager{manager}, m_name{name} { + m_manager->push(m_name); + } + ~ScopeGuard() { + m_manager->pop(m_name); + } + }; + void push(std::string name) { + m_current_scope = m_current_scope->enter(name); + } + void pop(std::string name) { + m_current_scope = m_current_scope->exit(name); + } + std::string next_tensor_name() { + std::string builder; + Scope* scope = &m_root; + while (true) { + builder.append(scope->name); + if (scope->version != 0) { + builder.append(ssprintf("(%ld)", scope->version)); + } + if (scope != &m_root) { + builder.append("."); + } + if (scope->active_child == nullptr) { + builder.append(ssprintf(":%%%ld", scope->tensor_count++)); + break; + } else { + scope = scope->active_child; + } + } + return builder; + } + }; - struct WorkerState: State { + struct State { std::thread::id tid; - CompNode::UnorderedMap> device_scope_map; + OptionManager options; }; + struct ChannelState: State { + ScopeManager scopes; + }; + + struct WorkerState: State {}; + ChannelState m_channel_state; WorkerState m_worker_state; - /*! * \brief A framework of dynamic sublienar memory optimization * @@ -327,7 +412,6 @@ private: // assert thread id when call get_xxx_state to avoid misuse ChannelState& get_channel_state(); WorkerState& get_worker_state(); - }; } // namespace mgb::imperative::interpreter::intl diff --git a/imperative/src/impl/interpreter/profiler.h b/imperative/src/impl/interpreter/profiler.h deleted file mode 100644 index 538a3540fc91e53e6328c400ad4db4b3876ff01a..0000000000000000000000000000000000000000 --- a/imperative/src/impl/interpreter/profiler.h +++ /dev/null @@ -1,93 +0,0 @@ -/** - * \file imperative/src/impl/interpreter/profiler.h - * MegEngine is Licensed under the Apache License, Version 2.0 (the "License") - * - * Copyright (c) 2014-2020 Megvii Inc. All rights reserved. - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - */ - -#pragma once - -#include "megbrain/imperative/profiler.h" - -#include "./commands.h" -#include "./events.h" -#include "./option_manager.h" - -namespace mgb::imperative::interpreter::intl { - -class InterpreterProfiler: public Profiler< - CommandEnqueueEvent, CommandExecuteEvent, CommandFinishEvent, - OpExecuteEvent, OpExecuteFinishEvent, - KernelExecuteEvent, KernelExecuteFinishEvent, - TensorDeclareEvent, TensorProduceEvent, TensorEraseEvent, - TensorGetPropEvent, TensorWaitPropEvent, TensorNotifyPropEvent, TensorWaitPropFinishEvent, - SyncEvent, SyncFinishEvent, - ScopeEvent, ScopeFinishEvent, - DeviceScopeEvent, DeviceScopeFinishEvent> { - -public: - enum Topic { - Command = 0b000001, - Operator = 0b000010, - TensorLifetime = 0b000100, - TensorProp = 0b001000, - Sync = 0b010000, - Scope = 0b100000, - }; - - struct Option { - Topic topic; - bool align_time; - bool show_operator_name; - - static Option from_dict(std::unordered_map dict) { - Option option; - option.topic = Topic(dict.at("topic")); - option.align_time = bool(dict.at("align_time")); - option.show_operator_name = bool(dict.at("show_operator_name")); - return option; - } - }; - - Option get_option() const { - return m_option; - } - - void set_option(const Option& option) { - m_option = option; - } - - static Mask topic_to_mask(Topic topic) { - Mask result; - if (topic & Command) { - result |= mask_of(); - } - if (topic & Operator) { - result |= mask_of(); - result |= mask_of(); - } - if (topic & TensorLifetime) { - result |= mask_of(); - } - if (topic & TensorProp) { - result |= mask_of(); - } - if (topic & Sync) { - result |= mask_of(); - } - if (topic & Scope) { - result |= mask_of(); - result |= mask_of(); - } - return result; - } - -private: - Option m_option; -}; - -} diff --git a/imperative/src/impl/interpreter/tensor_info.h b/imperative/src/impl/interpreter/tensor_info.h index 11e98a15df38969a71b332b6b87d9289b12bf1ae..5a3067ca056e55a86423d0b959ee5e9147850101 100644 --- a/imperative/src/impl/interpreter/tensor_info.h +++ b/imperative/src/impl/interpreter/tensor_info.h @@ -27,19 +27,19 @@ enum EvictType { /*! * \brief an identifier to specify a component of evicted tensors - * + * * Each component tracks the sum of the compute costs of its elements, with the * union of two components having the sum of each constituent cost. */ struct DsuNode { DsuNode(double _t): t(_t) {} - + std::shared_ptr parent; bool is_root() { return !bool(parent); } - + double t; }; @@ -47,25 +47,33 @@ struct TensorInfo; using TensorInfoPtr = std::shared_ptr; struct TensorInfo { - enum Prop { - Device, Shape, DType, DevValue, HostValue + enum Status { + InvalidStatus, Allocated, Produced, Swapped, Dropped, Deleted, }; - uint64_t id; + uint64_t id = -1; + std::string name; + // Most attrs of TensorInfo, except `ptr` and `h_value`, + // were visited read and written in main thread. + // Lock interpreter when visiting `ptr`. TensorPtr ptr; LogicalTensorDesc desc; double compute_time; size_t memory; double last_used_time; - - // FIXME: broken by drop - bool value_fetched = false; + bool invalid = false; bool allow_delete = false; EvictType evict_type = NONE; + // Status should be only modified in worker thread + Status status = InvalidStatus; + + // Used by HostCompute and Memory Swap. + // HostCompute and Swap does not happen in one thread. + // Maybe a barrier is needed. HostTensorND h_value; // reserved for auto drop @@ -74,6 +82,10 @@ struct TensorInfo { size_t ref_cnt = 0; std::shared_ptr dsu_ptr; + // Not reference count, inc when used as input + size_t ptr_use_count = 0; + + // Used by `Drop` action struct ComputePath { uint64_t id; std::shared_ptr op; @@ -111,7 +123,7 @@ struct TensorInfo { return path; } }* producer = nullptr; - + double eval_func(double cost, double free_mem, double cur_time, double param_cost, double param_mem, double param_time, double param_recompute_times) { return pow(cost + 1e-3, param_cost) * pow(param_recompute_times, (double)recompute_times) @@ -126,20 +138,24 @@ struct TensorInfo { --pinned; } - void detach_producer() { + // returns true if producer is deleted + bool detach_producer() { if (!producer) { - return; + return false; } auto output = std::find(producer->outputs.begin(), producer->outputs.end(), this); mgb_assert(output != producer->outputs.end()); *output = nullptr; + bool deleted = false; if (producer->ref_cnt() == 0) { for (auto* input: producer->unique_inputs) { input->users.erase(std::find(input->users.begin(), input->users.end(), producer)); } delete producer; + deleted = true; } producer = nullptr; + return deleted; } bool size_exceeds_thd(size_t thd) { @@ -150,26 +166,4 @@ struct TensorInfo { }; } -template <> -struct ToStringTrait{ - using TensorInfo = interpreter::intl::TensorInfo; - - std::string operator()(TensorInfo::Prop prop) const { - switch(prop) { - case TensorInfo::DType: - return "dtype"; - case TensorInfo::DevValue: - return "dev_value"; - case TensorInfo::Device: - return "device"; - case TensorInfo::HostValue: - return "host_value"; - case TensorInfo::Shape: - return "shape"; - default: - return "unknown"; - } - } -}; - } diff --git a/imperative/src/impl/profiler.cpp b/imperative/src/impl/profiler.cpp index 7aeb5674f20485033fe0639a1e153743beee39a1..2c9f6c275bf154db8347717c03554397e7c7e32d 100644 --- a/imperative/src/impl/profiler.cpp +++ b/imperative/src/impl/profiler.cpp @@ -22,47 +22,58 @@ #include "./event_pool.h" #include "./op_trait.h" +#include "./profiler/formats.h" + namespace mgb { namespace imperative { -namespace { - -DeviceTimer::SharedEvent alloc_recorded_event(CompNode device) { - auto event = EventPool::with_timer().alloc_shared(device); - event->record(); - return event; +uint64_t Timer::get_nsecs() { + using namespace std::chrono; + auto finish = steady_clock::now(); + auto duration = duration_cast(finish - m_start); + return duration.count(); } -} // namespace - -DeviceTimer::SharedEvent DeviceTimer::get_device_time(CompNode device) { - return alloc_recorded_event(device); +uint64_t Timer::get_started_at() { + return m_started_at; } -SmallVector DeviceTimer::get_all(SmallVector device_list) { - SmallVector results; - for (auto&& device: device_list) { - results.push_back(alloc_recorded_event(device)); - } - return results; +void Timer::reset() { + using namespace std::chrono; + m_start = steady_clock::now(); + auto now_ns = duration_cast(std::chrono::system_clock::now().time_since_epoch()); + m_started_at = now_ns.count(); } -double HostTimer::get_msecs() { - using namespace std::chrono; - auto finish = steady_clock::now(); - auto duration = duration_cast(finish - m_start); - return (double)duration.count() / 1e3; +std::shared_ptr Timer::record_event(CompNode device) { + auto event = EventPool::with_timer().alloc_shared(device); + event->record(); + return event; } -double HostTimer::get_started_at() { - return m_started_at; +Profiler::options_t Profiler::sm_profile_options; +std::mutex Profiler::sm_mutex; +std::unordered_map Profiler::sm_profilers; +Timer Profiler::sm_timer; +std::atomic_uint64_t Profiler::sm_last_id = 0; +bool Profiler::sm_profiling = false; +thread_local std::unique_ptr Profiler::tm_profiler = std::make_unique(); +std::atomic_size_t Profiler::sm_preferred_capacity; + +auto Profiler::get_thread_dict() -> thread_dict_t { + MGB_LOCK_GUARD(sm_mutex); + thread_dict_t thread_dict; + for (auto&& [tid, profiler]: sm_profilers) { + thread_dict[tid] = profiler->m_thread_name; + } + return thread_dict; } -void HostTimer::reset() { - using namespace std::chrono; - m_start = steady_clock::now(); - auto now_us = duration_cast(std::chrono::system_clock::now().time_since_epoch()); - m_started_at = (double)(now_us.count()) / 1e3; +void Profiler::dump_profile(std::string basename, std::string format, results_t results, options_t options) { + auto thread_dict = get_thread_dict(); + { + mgb_log_error("unsupported profiling format %s", format.c_str()); + } } } // namespace imperative diff --git a/imperative/src/impl/profiler/chrome_timeline.cpp b/imperative/src/impl/profiler/chrome_timeline.cpp deleted file mode 100644 index 956ae90b095a6f7caaa754382a58dfa7c5008cf0..0000000000000000000000000000000000000000 --- a/imperative/src/impl/profiler/chrome_timeline.cpp +++ /dev/null @@ -1,145 +0,0 @@ -#include -#include -#include "megbrain/utils/json.h" - -namespace mgb { -namespace imperative { - -class ChromeTraceEvent { -public: - ChromeTraceEvent& name(std::string name) { - m_name = std::move(name); - return *this; - } - ChromeTraceEvent& tid(uint64_t tid) { - m_tid = std::move(tid); - return *this; - } - ChromeTraceEvent& cat(std::string cat) { - m_cat = std::move(cat); - return *this; - } - ChromeTraceEvent& pid(uint64_t pid) { - m_pid = pid; - return *this; - } - ChromeTraceEvent& id(uint64_t id) { - m_id = id; - return *this; - } - ChromeTraceEvent& idx(uint64_t idx) { - m_idx = idx; - return *this; - } - ChromeTraceEvent& ts(double ts) { - m_ts = ts; - return *this; - } - ChromeTraceEvent& dur(double dur) { - m_dur = dur; - return *this; - } - ChromeTraceEvent& ph(char ph) { - m_ph = ph; - return *this; - } - ChromeTraceEvent& bp(char bp) { - m_bp = bp; - return *this; - } - ChromeTraceEvent& args(std::shared_ptr args) { - m_args = std::move(args); - return *this; - } - ChromeTraceEvent& arg(std::string key, std::string value) { - if (!m_args) { - m_args = json::Object::make(); - } - (*m_args)[key] = json::String::make(value); - return *this; - } - ChromeTraceEvent& arg(std::string key, double value) { - if (!m_args) { - m_args = json::Object::make(); - } - (*m_args)[key] = json::Number::make(value); - return *this; - } - ChromeTraceEvent& arg(std::string key, std::shared_ptr value) { - if (!m_args) { - m_args = json::Object::make(); - } - (*m_args)[key] = value; - return *this; - } - - std::shared_ptr to_json() const { - auto result = json::Object::make(); - auto prop_str = [&](auto key, auto value) { - if (value.empty()) { - return; - } - (*result)[key] = json::String::make(value); - }; - auto prop_num = [&](auto key, auto value) { - if (!value) { - return; - } - (*result)[key] = json::Number::make(value.value()); - }; - auto prop_char = [&](auto key, auto value) { - if (!value) { - return; - } - (*result)[key] = json::String::make(std::string{} + value.value()); - }; - prop_str("name", m_name); - prop_num("tid", m_tid); - prop_str("cat", m_cat); - prop_num("pid", m_pid); - prop_num("id", m_id); - prop_num("idx", m_idx); - prop_num("ts", m_ts); - prop_num("dur", m_dur); - prop_char("ph", m_ph); - prop_char("bp", m_bp); - if (m_args) { - (*result)["args"] = m_args; - } - return result; - } -private: - std::string m_name; - std::string m_cat; - - std::optional m_tid; - std::optional m_pid; - std::optional m_id; - std::optional m_idx; - std::optional m_ts; - std::optional m_dur; - std::optional m_ph; - std::optional m_bp; - std::shared_ptr m_args; -}; - -class ChromeTraceEventList { -public: - ChromeTraceEvent& new_event() { - m_content.emplace_back(); - return m_content.back(); - } - - std::shared_ptr to_json() const { - auto result = json::Array::make(); - for (auto&& event: m_content) { - result->add(event.to_json()); - } - return result; - } -private: - std::vector m_content; -}; - -} // namespace imperative -} // namespace mgb diff --git a/imperative/src/impl/profiler/events.h b/imperative/src/impl/profiler/events.h new file mode 100644 index 0000000000000000000000000000000000000000..c51ffe2fa48801dce9a1e27194a679358cdc0546 --- /dev/null +++ b/imperative/src/impl/profiler/events.h @@ -0,0 +1,186 @@ +/** + * \file imperative/src/impl/interpreter/events.h + * MegEngine is Licensed under the Apache License, Version 2.0 (the "License") + * + * Copyright (c) 2014-2020 Megvii Inc. All rights reserved. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + +#pragma once + +#include "megbrain/utils/small_vector.h" + +#include "../op_trait.h" + +namespace mgb::imperative::profiler { + +enum class TensorProp { + InvalidProp, Device, Shape, DType, DevValue, HostValue, +}; + +using OpParams = std::unordered_map; + +} + +namespace mgb::imperative { + +template <> +struct ToStringTrait{ + using TensorProp = profiler::TensorProp; + std::string operator()(TensorProp prop) const { + switch(prop) { + case TensorProp::DType: + return "dtype"; + case TensorProp::DevValue: + return "dev_value"; + case TensorProp::Device: + return "device"; + case TensorProp::HostValue: + return "host_value"; + case TensorProp::Shape: + return "shape"; + default: + return "unknown"; + } + } +}; + +} + +namespace mgb::imperative::profiler { + +#define DEF_EVENT(X, ...) struct X##Event __VA_ARGS__; +#define DEF_DUR_EVENT(X, ...) struct X##Event __VA_ARGS__; struct X##FinishEvent __VA_ARGS__; + +DEF_EVENT(OpDispatch, { + uint64_t op_id; + std::string op_name; + std::function op_params; + SmallVector inputs; + SmallVector outputs; +}); + +DEF_DUR_EVENT(OpInput, { + uint64_t tensor_id; + TensorShape shape; +}); + +DEF_DUR_EVENT(OpDel, { + uint64_t tensor_id; + TensorShape shape; +}); + +DEF_DUR_EVENT(OpOutput, { + uint64_t tensor_id; + TensorShape shape; +}); + +DEF_DUR_EVENT(OpExecute, { + uint64_t op_id; +}); + +DEF_DUR_EVENT(OpPostExecute, { + uint64_t op_id; +}); + +DEF_DUR_EVENT(KernelExecute, { + uint64_t op_id; + uint64_t kernel_id; + std::shared_ptr event; +}); + +DEF_EVENT(TensorDeclare, { + uint64_t tensor_id; + std::string name; +}); + +DEF_EVENT(TensorProduce, { + uint64_t tensor_id; + TensorLayout layout; + CompNode device; + void* ptr; +}); + +DEF_EVENT(TensorUsage, { + uint64_t tensor_id; +}); + +DEF_EVENT(TensorRelease, { + uint64_t tensor_id; +}); + +DEF_EVENT(TensorErase, { + uint64_t tensor_id; + size_t use_count; +}); + +DEF_EVENT(TensorGetProp, { + uint64_t tensor_id; + TensorProp prop; +}); + +DEF_EVENT(TensorNotifyProp, { + uint64_t tensor_id; + uint64_t wait_id; + TensorProp prop; +}); + +DEF_EVENT(TensorWaitProp, { + uint64_t tensor_id; + uint64_t wait_id; + TensorProp prop; +}); + +DEF_EVENT(TensorWaitPropFinish, { + uint64_t tensor_id; + uint64_t wait_id; + TensorProp prop; + bool notified; +}); + +DEF_DUR_EVENT(SampleDevice, { + CompNode device; + size_t total_memory; + size_t free_memory; +}); + +DEF_EVENT(WorkerException, {}); + +DEF_EVENT(ShapeInfer, { + bool success; +}); + +DEF_DUR_EVENT(Scope, { + std::string name; +}); + +DEF_DUR_EVENT(DeviceScope, { + std::string name; + std::shared_ptr event; +}); + +DEF_DUR_EVENT(Sync, {}); + +DEF_DUR_EVENT(StartProfile, { + size_t capture_count; +}); + +DEF_DUR_EVENT(StopProfile, { + size_t escape_count; +}); + +DEF_DUR_EVENT(TensorCommand, { + enum Kind { + Put, Del, SwapIn, SwapOut, Drop, ReGen, RecFree, GetValue + }; + uint64_t tensor_id; + Kind kind; +}); + +#undef DEF_EVENT +#undef DEF_DUR_EVENT + +} diff --git a/imperative/src/impl/interpreter/profiler.cpp b/imperative/src/impl/profiler/formats.h similarity index 50% rename from imperative/src/impl/interpreter/profiler.cpp rename to imperative/src/impl/profiler/formats.h index ebf8cbc9448e3456183091a75604e515f4ee7bea..05bbda7bf54864f80caffd4a169c52092c527dca 100644 --- a/imperative/src/impl/interpreter/profiler.cpp +++ b/imperative/src/impl/profiler/formats.h @@ -1,5 +1,5 @@ /** - * \file imperative/src/impl/interpreter/profiler.cpp + * \file imperative/src/impl/interpreter/profiler.h * MegEngine is Licensed under the Apache License, Version 2.0 (the "License") * * Copyright (c) 2014-2020 Megvii Inc. All rights reserved. @@ -9,22 +9,12 @@ * "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ -#include "./profiler.h" +#pragma once -#include -#include +#include -#if defined(__unix__) || (defined(__APPLE__) && defined(__MACH__)) -#include -#elif defined(_WIN32) -#include -#else -#error Unsupported platform -#endif - -#include "../op_trait.h" - -namespace mgb::imperative::interpreter::intl { +#include "megbrain/imperative/profiler.h" +namespace mgb::imperative::profiler { } diff --git a/imperative/src/impl/profiler/states.h b/imperative/src/impl/profiler/states.h index ed3feb6c76168650e5688df8af32652e1b6c5fa2..bfdc2284020410b91f2d1a5fda94a3a34e15123b 100644 --- a/imperative/src/impl/profiler/states.h +++ b/imperative/src/impl/profiler/states.h @@ -6,6 +6,8 @@ #include "megbrain/tensor.h" +#include "./events.h" + namespace mgb::imperative::profiler { struct ProfileDeviceState { @@ -53,6 +55,7 @@ struct ProfileStaticsState { struct ProfileOperatorState { uint64_t id; std::string name; + OpParams params; SmallVector inputs; SmallVector outputs; CompNode device; diff --git a/imperative/src/include/megbrain/imperative/interpreter.h b/imperative/src/include/megbrain/imperative/interpreter.h index d9de1308560515bc4d0163e3cb68cfd044af5236..a4f29ff4f6fb401774baa0e9f36d5425ee7d8d6a 100644 --- a/imperative/src/include/megbrain/imperative/interpreter.h +++ b/imperative/src/include/megbrain/imperative/interpreter.h @@ -47,8 +47,8 @@ struct Interpreter { virtual size_t get_option(std::string name) = 0; virtual void set_option(std::string name, size_t value) = 0; - virtual void start_profile(std::unordered_map option) = 0; - virtual void stop_profile(std::string basename, std::string format) = 0; + virtual void start_profile() = 0; + virtual void stop_profile() = 0; virtual void push_scope(std::string name) = 0; virtual void pop_scope(std::string name) = 0; diff --git a/imperative/src/include/megbrain/imperative/profiler.h b/imperative/src/include/megbrain/imperative/profiler.h index 9d2f516586bdd4b1fc0fcfb1b14469f407bcc828..64211da5f11f7121d060e12624968807c67883d5 100644 --- a/imperative/src/include/megbrain/imperative/profiler.h +++ b/imperative/src/include/megbrain/imperative/profiler.h @@ -17,6 +17,9 @@ #include #include #include +#include +#include +#include #include "megbrain/comp_node.h" #include "megbrain/graph/event.h" @@ -29,165 +32,188 @@ namespace mgb { namespace imperative { -class DeviceTimer { -public: - using SharedEvent = std::shared_ptr; - DeviceTimer() = default; - SharedEvent get_device_time(CompNode device); - SmallVector get_all(SmallVector device_list); -}; - -class HostTimer { +class Timer { public: void reset(); - double get_msecs(); - double get_started_at(); + uint64_t get_nsecs(); + uint64_t get_started_at(); + static std::shared_ptr record_event(CompNode device); private: decltype(std::chrono::steady_clock::now()) m_start; - double m_started_at; + uint64_t m_started_at; }; -class ProfilerBase { +class Profiler { public: - using Host = std::thread::id; - using Device = CompNode; - - struct HostInstant { - Host tid; - double time; - - void wait() const {} + struct Record { + uint64_t id; + uint64_t time; //in ns + std::any data; }; - - struct DeviceInstant { - double before; - std::shared_ptr event; - double after; - - void wait() const { - event->host_wait(); - } + enum Status: uint8_t { + Running = 0, + Recording = 1, + Collecting = 2, }; + using ProfileCollector = std::function; + using option_t = uint64_t; + using options_t = std::unordered_map; + using result_t = std::pair; + using results_t = std::vector; + using thread_dict_t = std::unordered_map; +private: + std::thread::id m_thread_id; + std::vector m_records; + std::atomic m_status = Running; + uint64_t m_last_time = 0; + std::string m_thread_name; + + static options_t sm_profile_options; + static std::mutex sm_mutex; + static std::unordered_map sm_profilers; + static Timer sm_timer; + static std::atomic_uint64_t sm_last_id; + static std::atomic_size_t sm_preferred_capacity; + static bool sm_profiling; + static constexpr bool sm_debug = false; + thread_local static std::unique_ptr tm_profiler; +public: + Profiler() { + m_thread_id = std::this_thread::get_id(); + MGB_LOCK_GUARD(sm_mutex); + if (sm_profilers.size() == 0) { + reset(); + } + mgb_assert(sm_profilers.count(m_thread_id) == 0); + sm_profilers[m_thread_id] = this; + } + ~Profiler() { + MGB_LOCK_GUARD(sm_mutex); + mgb_assert(sm_profilers.count(m_thread_id) == 1); + sm_profilers.erase(m_thread_id); + } +public: + static Profiler& get_instance() { + return *tm_profiler; + } - using Instant = std::variant; + static void reset() { + mgb_assert(sm_profilers.size() == 0, "profiler already running"); + sm_timer.reset(); + } - template - struct EventRecord { - Instant instant; - TEvent data; + static uint64_t next_id() { + return sm_last_id++; + } - const HostInstant& host() const { - return std::get(instant); + template + static uint64_t record(TArgs&&... args) { + auto& profiler = get_instance(); + auto last_time = profiler.m_last_time; + if constexpr (sm_debug) { + Status expected = Running; + mgb_assert(profiler.m_status.compare_exchange_strong(expected, Recording)); } - - const DeviceInstant& device() const { - return std::get(instant); + uint64_t id = next_id(); + uint64_t time = sm_timer.get_nsecs(); + time = std::max(time, last_time + 2000); + profiler.m_last_time = time; + profiler.m_records.push_back({id, time, T{std::forward(args)...}}); + if constexpr (sm_debug) { + Status expected = Recording; + mgb_assert(profiler.m_status.compare_exchange_strong(expected, Running)); } + return id; + } - void wait() const { - std::visit([&](const auto& instant){ instant.wait(); }, instant); + static results_t collect() { + MGB_LOCK_GUARD(sm_mutex); + if constexpr (sm_debug) { + for (auto&& [tid, profiler]: sm_profilers) { + Status expected = Running; + mgb_assert(profiler->m_status.compare_exchange_strong(expected, Collecting)); + } } - }; -protected: - HostInstant record_host() { - return {std::this_thread::get_id(), m_host_timer.get_msecs()}; + std::vector> profile_data; + for (auto&& [tid, profiler]: sm_profilers) { + sm_preferred_capacity = std::max(sm_preferred_capacity.load(), profiler->m_records.size()); + for (auto& record: profiler->m_records) { + profile_data.push_back({tid, std::move(record)}); + } + profiler->m_records.clear(); + profiler->m_records.reserve(sm_preferred_capacity); + } + std::sort(profile_data.begin(), profile_data.end(), [](auto& lhs, auto& rhs){ + return lhs.second.id < rhs.second.id; + }); + if constexpr (sm_debug) { + for (auto&& [tid, profiler]: sm_profilers) { + Status expected = Collecting; + mgb_assert(profiler->m_status.compare_exchange_strong(expected, Running)); + } + } + return profile_data; } - DeviceInstant record_device(Device device) { - auto before = m_host_timer.get_msecs(); - auto event = m_device_timer.get_device_time(device); - auto after = m_host_timer.get_msecs(); - return {before, event, after}; + + static option_t get_option(std::string key, option_t default_val) { + if (!sm_profile_options.count(key)) { + return default_val; + } + return sm_profile_options.at(key); } -protected: - std::atomic_int64_t m_last_id = 0; - HostTimer m_host_timer; - DeviceTimer m_device_timer; - Spinlock m_lock; -}; + static void load_options(options_t options) { + sm_profile_options = std::move(options); + } -template -class Profiler: public ProfilerBase { -public: - using Record = std::variant...>; - using Mask = std::bitset; + static options_t get_options() { + return sm_profile_options; + } - struct Data { - std::vector records; - double started_at; - }; + static bool is_profiling() { + return sm_profiling; + } - template - static constexpr size_t index_of() { - if constexpr (index == std::variant_size_v) { - return index; - } else if constexpr (std::is_same_v, std::variant_alternative_t>) { - return index; - } else { - return index_of(); - } - }; + static void start_profile() { + mgb_assert(!sm_profiling); + sm_profiling = true; + } - template - static Mask mask_of() { - return Mask{} | (Mask{}.set(index_of()) |...); + static void stop_profile() { + mgb_assert(sm_profiling); + sm_profiling = false; } - enum Status { - NotStarted, Profiling, Stopped - }; + static thread_dict_t get_thread_dict(); + + static void dump_profile(std::string basename, std::string format, results_t results, options_t options); +}; + + +class ProfileDataCollector { public: - template - void record_host(TArgs&&... args) { - MGB_LOCK_GUARD(m_lock); - if (!m_event_mask.test(index_of())) { - return; - } - mgb_assert(m_status != Stopped, "record after stop"); - auto instant = HostInstant{std::this_thread::get_id(), m_host_timer.get_msecs()}; - m_record_list.emplace_back(EventRecord{std::move(instant), {std::forward(args)...}}); + template + using SubCollector = std::function; +private: + std::unordered_map> m_collectors; +public: + template + ProfileDataCollector& handle(SubCollector collector) { + auto erased = [collector](uint64_t id, std::thread::id tid, uint64_t time, std::any data){ + collector(id, tid, time, std::any_cast(std::move(data))); + }; + m_collectors[typeid(T)] = erased; + return *this; } - template - void record_device(Device device, TArgs&&... args) { - MGB_LOCK_GUARD(m_lock); - if (!m_event_mask.test(index_of())) { + void operator()(uint64_t id, std::thread::id tid, uint64_t time, std::any event) { + std::type_index type = event.type(); + if (m_collectors.count(type) == 0) { return; } - mgb_assert(m_status != Stopped, "record after stop"); - auto before = m_host_timer.get_msecs(); - auto event = m_device_timer.get_device_time(device); - auto after = m_host_timer.get_msecs(); - auto instant = DeviceInstant{before, event, after}; - m_record_list.emplace_back(EventRecord{std::move(instant), {std::forward(args)...}}); - } - // unsafe - bool is_profiling() { - return m_status == Profiling; - } - void start(Mask mask) { - MGB_LOCK_GUARD(m_lock); - mgb_assert(m_status == NotStarted, "profiler already started"); - m_status = Profiling; - m_event_mask = mask; - m_host_timer.reset(); - } - Data stop() { - MGB_LOCK_GUARD(m_lock); - mgb_assert(m_status == Profiling, "profiler not active"); - m_status = Stopped; - for (auto&& record: m_record_list) { - std::visit([&](const auto& record){ - record.wait(); - }, record); - } - auto records = std::move(m_record_list); - return { records, m_host_timer.get_started_at() }; + auto& handler = m_collectors.at(type); + handler(id, tid, time, std::move(event)); } -protected: - std::vector m_record_list; - Mask m_event_mask; - std::atomic m_status = NotStarted; }; } // namespace imperative