提交 730ddc2d 编写于 作者: M Megvii Engine Team

perf(interpreter): improve interpreter performance

GitOrigin-RevId: 88f51d15f804bdf33e64f7591d84657ab6635571
上级 729242f9
......@@ -59,9 +59,9 @@ void BlobManagerImpl::alloc_direct(Blob* blob, size_t size) {
}
DeviceTensorND BlobManagerImpl::alloc_workspace_with_defrag(
CompNode cn, TensorLayout layout) {
CompNode cn, TensorLayout& layout) {
DeviceTensorND dev_tensor;
MGB_TRY { dev_tensor = alloc_workspace(cn, layout); }
MGB_TRY { return alloc_workspace(cn, layout); }
MGB_CATCH(MemAllocError&, {
mgb_log_warn("memory allocation failed for workspace; try defragmenting");
defrag(cn);
......@@ -149,7 +149,7 @@ struct BlobManagerStub : BlobManager {
void alloc_with_defrag(Blob* blob, size_t size) {
mgb_assert(0, "prohibited after global variable destruction");
};
DeviceTensorND alloc_workspace_with_defrag(CompNode cn, TensorLayout layout) {
DeviceTensorND alloc_workspace_with_defrag(CompNode cn, TensorLayout& layout) {
mgb_assert(0, "prohibited after global variable destruction");
};
void register_blob(Blob* blob) {
......
......@@ -51,7 +51,7 @@ public:
void alloc_with_defrag(Blob* blob, size_t size) override;
DeviceTensorND alloc_workspace_with_defrag(
CompNode cn, TensorLayout layout) override;
CompNode cn, TensorLayout& layout) override;
void register_blob(Blob* blob) override;
......
......@@ -156,9 +156,16 @@ TensorInfo* ChannelImpl::put_impl(const HostTensorND& value, bool no_cache) {
info->h_value = value;
info->desc.value = value.proxy_to_default_cpu();
}
if (Profiler::is_profiling()) {
m_worker.add_task(
{Profiler::next_id(), Put{info, value, no_cache},
get_channel_state().stack_manager.dump()});
} else {
m_worker.add_task({
Profiler::next_id(),
Put{info, value, no_cache},
});
}
if (m_async_level == 0) {
sync_impl();
info->desc.comp_node.sync();
......@@ -205,8 +212,16 @@ void ChannelImpl::del_impl(Handle handle) {
mgb_assert(m_valid_handle.count(handle), "invalid handle: %p", handle);
auto* info = reinterpret_cast<TensorInfo*>(handle);
m_valid_handle.erase(handle);
if (Profiler::is_profiling()) {
m_worker.add_task(
{Profiler::next_id(), Del{info}, get_channel_state().stack_manager.dump()});
{Profiler::next_id(), Del{info},
get_channel_state().stack_manager.dump()});
} else {
m_worker.add_task({
Profiler::next_id(),
Del{info},
});
}
}
void ChannelImpl::drop(Handle handle) {
......@@ -218,9 +233,16 @@ void ChannelImpl::drop(Handle handle) {
m_valid_handle.find(handle) != m_valid_handle.end(),
"invalid handle: %p", handle);
auto* info = reinterpret_cast<TensorInfo*>(handle);
if (Profiler::is_profiling()) {
m_worker.add_task(
{Profiler::next_id(), Drop{info},
get_channel_state().stack_manager.dump()});
} else {
m_worker.add_task({
Profiler::next_id(),
Drop{info},
});
}
}
}
......@@ -317,29 +339,29 @@ void ChannelImpl::dispatch_kernel(
auto& state = get_channel_state();
auto& options = state.options;
auto name = op->trait()->make_name(*op);
auto _ = StackManager::Guard{name, &state.stack_manager};
auto [output_descs, validated] =
OpDef::infer_output_attrs_fallible(*op, input_descs);
MGB_RECORD_EVENT(ShapeInferEvent, validated);
ApplyOp cmd{Profiler::next_id(), std::move(op)};
cmd.validated = validated;
cmd.inputs = std::move(input_infos);
SmallVector<TensorInfo*> output_infos;
output_infos.reserve(output_descs.size());
uint64_t apply_id = Profiler::next_id();
outputs->reserve(output_descs.size());
for (int i = 0; i < output_descs.size(); ++i) {
auto&& desc = output_descs[i];
auto info = alloc();
init(info, desc);
init(info, std::move(desc));
// make sure desc's value is consistent with h_value
if (!info->desc.value.empty()) {
info->h_value = HostTensorND::make_proxy(desc.value)
.proxy_to_comp_node(desc.comp_node);
}
cmd.outputs.push_back(info);
output_infos.push_back(info);
outputs->push_back(reinterpret_cast<Handle>(info));
}
auto op_info_getter = [op = cmd.op] {
auto op_info_getter = [op] {
std::unordered_map<std::string, std::string> op_info;
auto props = OpDef::props(*op);
for (auto&& [key, value] : props) {
......@@ -347,12 +369,25 @@ void ChannelImpl::dispatch_kernel(
}
return op_info;
};
if (Profiler::is_profiling()) {
auto name = op->trait()->make_name(*op);
auto _ = StackManager::Guard{name, &state.stack_manager};
MGB_RECORD_EVENT(
OpDispatchEvent, cmd.id, name, op_info_getter, tinfo_to_tid(cmd.inputs),
tinfo_to_tid(cmd.outputs), state.stack_manager.dump());
OpDispatchEvent, apply_id, name, op_info_getter,
tinfo_to_tid(std::move(input_infos)),
tinfo_to_tid(std::move(output_infos)), state.stack_manager.dump());
m_worker.add_task(
{Profiler::next_id(), std::move(cmd),
{Profiler::next_id(),
ApplyOp{apply_id, std::move(op), std::move(input_infos),
std::move(output_infos), validated},
get_channel_state().stack_manager.dump()});
} else {
m_worker.add_task({
Profiler::next_id(),
ApplyOp{apply_id, std::move(op), std::move(input_infos),
std::move(output_infos), validated},
});
}
if (!validated && options.async_level == 1) {
sync_impl();
} else if (options.async_level == 0) {
......@@ -396,7 +431,7 @@ SmallVector<Handle> ChannelImpl::apply_op_impl(
SmallVector<TensorInfo*> input_infos;
SmallVector<LogicalTensorDesc> input_descs;
{
MGB_LOCK_GUARD(m_mutex);
MGB_LOCK_GUARD(m_info_spin);
for (auto i : inputs) {
auto info = reinterpret_cast<TensorInfo*>(i);
mgb_assert(
......@@ -526,9 +561,16 @@ void ChannelImpl::set_option(std::string name, size_t value) {
mgb_assert(check_available(), "Channel already closed");
auto& state = get_channel_state();
state.options.set_option(name, value);
if (Profiler::is_profiling()) {
m_worker.add_task(
{Profiler::next_id(), SetOption{name, value},
get_channel_state().stack_manager.dump()});
} else {
m_worker.add_task({
Profiler::next_id(),
SetOption{name, value},
});
}
}
void ChannelImpl::clear_candidates() {
......@@ -540,8 +582,10 @@ void ChannelImpl::clear_candidates() {
TensorInfo* ChannelImpl::alloc() {
auto& state = get_channel_state();
auto info = [this] {
MGB_LOCK_GUARD(m_mutex);
return m_pool.alloc();
MGB_LOCK_GUARD(m_pool_spin);
auto* ptr = m_pool.alloc_raw();
new (ptr) TensorInfo();
return (TensorInfo*)ptr;
}();
info->id = Profiler::next_id();
if (Profiler::is_profiling()) {
......@@ -552,11 +596,11 @@ TensorInfo* ChannelImpl::alloc() {
return info;
}
void ChannelImpl::init(TensorInfo* info, LogicalTensorDesc desc) {
void ChannelImpl::init(TensorInfo* info, LogicalTensorDesc&& desc) {
m_valid_handle.insert(reinterpret_cast<Handle>(info));
MGB_RECORD_EVENT(TensorDeclareEvent, info->id, info->name);
info->status = TensorInfo::Allocated;
info->desc = std::move(desc);
info->desc = desc;
}
void ChannelImpl::do_drop(TensorInfo* ptr, bool user = false) {
......@@ -626,7 +670,7 @@ void ChannelImpl::real_free(TensorInfo* ptr) {
}
MGB_RECORD_EVENT(TensorEraseEvent, ptr->id, ptr->ptr_use_count);
ptr->status = TensorInfo::Deleted;
MGB_LOCK_GUARD(m_mutex);
MGB_LOCK_GUARD(m_pool_spin);
m_pool.free(ptr);
}
......@@ -705,21 +749,20 @@ void ChannelImpl::do_apply_op(const ApplyOp& cmd, std::string reason) {
auto_evict(0);
}
auto apply_on_physical_tensor =
[&](auto&& self, const OpDef& def, SmallVector<TensorPtr> inputs,
[&](auto&& self, const OpDef& def, SmallVector<TensorPtr>&& inputs,
SmallVector<LogicalTensorDesc>& output_descs,
const bool& validated) -> SmallVector<TensorPtr> {
if (def.trait()->make_forward_graph) {
auto apply_functor = [&](std::shared_ptr<OpDef> op,
SmallVector<TensorPtr> inputs,
size_t nr_outputs) -> SmallVector<TensorPtr> {
auto opname = op->trait()->make_name(*op);
imperative_log_profile_begin(opname.c_str());
// do not use infered output_desc in subgraph
auto outputs = self(self, *op, inputs, output_descs, false);
auto outputs = self(self, *op, std::move(inputs), output_descs, false);
imperative_log_profile_end(opname.c_str());
return outputs;
};
auto const_functor = [&](TensorPtr value) -> TensorPtr { return value; };
if (def.trait()->make_forward_graph) {
// apply recursivily
SmallVector<LogicalTensorDesc> input_descs;
for (auto&& input : inputs) {
......@@ -767,8 +810,7 @@ void ChannelImpl::do_apply_op(const ApplyOp& cmd, std::string reason) {
for (auto&& [device, kernel_id] : kernels) {
MGB_RECORD_EVENT(KernelLaunchEvent, apply_id, kernel_id, device);
MGB_RECORD_EVENT_IF(
(Profiler::get_option("profile_device", 0)), RecordDeviceEvent,
Timer::record_device(device));
profiling_device, RecordDeviceEvent, Timer::record_device(device));
}
// Apply op
SmallVector<LogicalTensorDesc> output_descs;
......@@ -777,31 +819,33 @@ void ChannelImpl::do_apply_op(const ApplyOp& cmd, std::string reason) {
}
// Here std::move is REQUIRED for removing duplicated references.
auto outputs = apply_on_physical_tensor(
apply_on_physical_tensor, *cmd.op, inputs, output_descs, cmd.validated);
apply_on_physical_tensor, *cmd.op, std::move(inputs), output_descs,
cmd.validated);
// After execute
for (auto&& [device, kernel_id] : kernels) {
MGB_RECORD_EVENT_IF(
(Profiler::get_option("profile_device", 0)), RecordDeviceEvent,
Timer::record_device(device));
profiling_device, RecordDeviceEvent, Timer::record_device(device));
MGB_RECORD_EVENT(KernelLaunchFinishEvent, apply_id, kernel_id, device);
}
// End profiling operator
mgb_assert(outputs.size() == cmd.outputs.size());
for (size_t i = 0; i < outputs.size(); ++i) {
auto output = cmd.outputs[i];
if (output == nullptr) {
if (mgb_unlikely(output == nullptr)) {
MGB_RECORD_EVENT(OpOutputEvent, 0);
MGB_RECORD_EVENT(OpOutputFinishEvent, 0);
} else if (output->ptr != nullptr) {
} else if (mgb_unlikely(output->ptr != nullptr)) {
MGB_RECORD_EVENT(OpOutputEvent, output->id);
MGB_RECORD_EVENT(OpOutputFinishEvent, output->id);
} else {
MGB_RECORD_EVENT(OpOutputEvent, output->id);
produce_tensor(output, outputs[i]);
MGB_RECORD_EVENT(OpOutputFinishEvent, output->id);
if (Profiler::is_profiling()) {
sample_on_device(output->desc.comp_node, false);
}
}
}
if (state.options.enable_dtr_auto_drop) {
double estimate_compute_time = 0;
......@@ -946,9 +990,16 @@ TensorPtr ChannelImpl::wait_tensor(TensorInfo* info, TensorProp prop) {
if (require_host && !host_available()) {
// avoid dead lock
lock.unlock();
if (Profiler::is_profiling()) {
m_worker.add_task(
{Profiler::next_id(), GetValue{info},
get_channel_state().stack_manager.dump()});
} else {
m_worker.add_task({
Profiler::next_id(),
GetValue{info},
});
}
lock.lock();
wait_host = true;
}
......@@ -1045,7 +1096,7 @@ void ChannelImpl::process_one_task(Command& icmd) {
sample_on_device(cmd.dest->desc.comp_node, false);
} else if constexpr (std::is_same_v<T, ApplyOp>) {
for (auto& i : cmd.inputs) {
if (i->invalid) {
if (mgb_unlikely(i->invalid)) {
MGB_LOCK_GUARD(m_mutex);
for (auto& i : cmd.outputs) {
i->invalid = true;
......@@ -1053,6 +1104,7 @@ void ChannelImpl::process_one_task(Command& icmd) {
return;
}
}
if (state.options.enable_dtr_auto_drop) {
m_apply_stack.push({cmd, 0, nullptr, "cmd"});
flush_apply_stack();
for (size_t i = 0; i < cmd.outputs.size(); ++i) {
......@@ -1060,9 +1112,10 @@ void ChannelImpl::process_one_task(Command& icmd) {
if (output == nullptr) {
continue;
}
if (state.options.enable_dtr_auto_drop) {
output->dsu_ptr = std::make_shared<DsuNode>(output->compute_time);
}
} else {
do_apply_op(cmd, "cmd");
}
if (state.options.enable_drop && state.options.record_computing_path) {
auto is_inplace = [](std::tuple<TensorInfo*, TensorInfo*> tuple2) {
......@@ -1229,9 +1282,16 @@ void ChannelImpl::start_profile() {
mgb_assert(check_available(), "Channel already closed");
auto capture_tensors = collect_valid_tensors();
if (capture_tensors.size() > 0) {
if (Profiler::is_profiling()) {
m_worker.add_task(
{Profiler::next_id(), StartProfile{std::move(capture_tensors)},
get_channel_state().stack_manager.dump()});
} else {
m_worker.add_task({
Profiler::next_id(),
StartProfile{std::move(capture_tensors)},
});
}
}
}
......@@ -1240,9 +1300,16 @@ void ChannelImpl::stop_profile() {
mgb_assert(check_available(), "Channel already closed");
auto escape_tensors = collect_valid_tensors();
if (escape_tensors.size() > 0) {
if (Profiler::is_profiling()) {
m_worker.add_task(
{Profiler::next_id(), StopProfile{std::move(escape_tensors)},
get_channel_state().stack_manager.dump()});
} else {
m_worker.add_task({
Profiler::next_id(),
StopProfile{std::move(escape_tensors)},
});
}
}
}
......@@ -1252,9 +1319,16 @@ void ChannelImpl::push_scope(std::string name) {
auto& state = get_channel_state();
state.stack_manager.enter(name);
MGB_RECORD_EVENT(ScopeEvent, name);
if (Profiler::is_profiling()) {
m_worker.add_task(
{Profiler::next_id(), PushScope{name},
get_channel_state().stack_manager.dump()});
} else {
m_worker.add_task({
Profiler::next_id(),
PushScope{name},
});
}
}
void ChannelImpl::pop_scope(std::string name) {
......@@ -1263,9 +1337,16 @@ void ChannelImpl::pop_scope(std::string name) {
auto& state = get_channel_state();
state.stack_manager.exit(name);
MGB_RECORD_EVENT(ScopeFinishEvent, name);
if (Profiler::is_profiling()) {
m_worker.add_task(
{Profiler::next_id(), PopScope{name},
get_channel_state().stack_manager.dump()});
} else {
m_worker.add_task({
Profiler::next_id(),
PopScope{name},
});
}
}
void ChannelImpl::assert_in_channel() {
......@@ -1281,10 +1362,12 @@ void ChannelImpl::assert_in_worker() {
}
void ChannelImpl::sample_on_device(CompNode device, bool force) {
if (!Profiler::is_profiling()) {
return;
}
if (!force) {
thread_local int last_sample_id = 0;
int sample_rate =
Profiler::is_profiling() ? Profiler::get_option("sample_rate", 0) : 0;
int sample_rate = Profiler::get_option("sample_rate", 0);
if (!sample_rate || ((++last_sample_id) % sample_rate != 0)) {
return;
}
......
......@@ -77,7 +77,7 @@ private:
struct State;
TensorInfo* alloc();
void init(TensorInfo*, LogicalTensorDesc desc);
void init(TensorInfo*, LogicalTensorDesc&& desc);
void free(TensorInfo*);
void real_free(TensorInfo*);
void recursive_free(TensorInfo*);
......@@ -132,6 +132,8 @@ private:
MemPool<TensorInfo> m_pool;
std::unordered_set<Handle> m_valid_handle;
TensorInfo* m_waitee = nullptr;
Spinlock m_pool_spin;
Spinlock m_info_spin;
uint64_t m_waitee_id = 0;
std::exception_ptr m_worker_exc;
std::function<void(std::string, std::string)> m_profile_dump_callback;
......
......@@ -39,7 +39,7 @@ DispatchMode OpDef::decide_dispatch_mode(
}
SmallVector<TensorPtr> OpDef::apply_on_physical_tensor(
const OpDef& def, SmallVector<TensorPtr> inputs,
const OpDef& def, const SmallVector<TensorPtr>& inputs,
SmallVector<LogicalTensorDesc>& output_descs, const bool& validated) {
return def.trait()->apply_on_physical_tensor(
def, std::move(inputs), output_descs, validated);
......
......@@ -160,7 +160,7 @@ struct OpMeth<Tag, RType(Args...)> : public thin_function<RType(Args...)> {
}
return false;
};
while (!this->Base::operator bool()) {
while (mgb_unlikely(!this->Base::operator bool())) {
using Mode = OpMethFallbackMode;
if (match_mode(Mode::FromSubgraph)) {
OpMethFallbackFromSubgraph::impl(*const_cast<OpMeth*>(this), Tag{});
......
......@@ -27,7 +27,7 @@ public:
virtual void alloc_with_defrag(Blob* blob, size_t size) = 0;
virtual DeviceTensorND alloc_workspace_with_defrag(
CompNode cn, TensorLayout layout) = 0;
CompNode cn, TensorLayout& layout) = 0;
virtual void register_blob(Blob* blob) = 0;
......
......@@ -51,7 +51,7 @@ public:
const OpDef& def, const SmallVector<LogicalTensorDesc>& inputs);
static SmallVector<TensorPtr> apply_on_physical_tensor(
const OpDef& def, SmallVector<TensorPtr> inputs,
const OpDef& def, const SmallVector<TensorPtr>& inputs,
SmallVector<LogicalTensorDesc>& output_descs, const bool& validated);
/*!
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册