From 730ddc2d8173e272f370c5e479119f61e95a41e6 Mon Sep 17 00:00:00 2001 From: Megvii Engine Team Date: Wed, 9 Mar 2022 17:15:32 +0800 Subject: [PATCH] perf(interpreter): improve interpreter performance GitOrigin-RevId: 88f51d15f804bdf33e64f7591d84657ab6635571 --- imperative/src/impl/blob_manager_impl.cpp | 6 +- imperative/src/impl/blob_manager_impl.h | 2 +- .../src/impl/interpreter/interpreter_impl.cpp | 239 ++++++++++++------ .../src/impl/interpreter/interpreter_impl.h | 4 +- imperative/src/impl/op_def.cpp | 2 +- imperative/src/impl/op_trait.h | 2 +- .../megbrain/imperative/blob_manager.h | 2 +- .../src/include/megbrain/imperative/op_def.h | 2 +- 8 files changed, 172 insertions(+), 87 deletions(-) diff --git a/imperative/src/impl/blob_manager_impl.cpp b/imperative/src/impl/blob_manager_impl.cpp index 88ba3106..a1a229fb 100644 --- a/imperative/src/impl/blob_manager_impl.cpp +++ b/imperative/src/impl/blob_manager_impl.cpp @@ -59,9 +59,9 @@ void BlobManagerImpl::alloc_direct(Blob* blob, size_t size) { } DeviceTensorND BlobManagerImpl::alloc_workspace_with_defrag( - CompNode cn, TensorLayout layout) { + CompNode cn, TensorLayout& layout) { DeviceTensorND dev_tensor; - MGB_TRY { dev_tensor = alloc_workspace(cn, layout); } + MGB_TRY { return alloc_workspace(cn, layout); } MGB_CATCH(MemAllocError&, { mgb_log_warn("memory allocation failed for workspace; try defragmenting"); defrag(cn); @@ -149,7 +149,7 @@ struct BlobManagerStub : BlobManager { void alloc_with_defrag(Blob* blob, size_t size) { mgb_assert(0, "prohibited after global variable destruction"); }; - DeviceTensorND alloc_workspace_with_defrag(CompNode cn, TensorLayout layout) { + DeviceTensorND alloc_workspace_with_defrag(CompNode cn, TensorLayout& layout) { mgb_assert(0, "prohibited after global variable destruction"); }; void register_blob(Blob* blob) { diff --git a/imperative/src/impl/blob_manager_impl.h b/imperative/src/impl/blob_manager_impl.h index 27b28ae4..3108fd4c 100644 --- a/imperative/src/impl/blob_manager_impl.h +++ b/imperative/src/impl/blob_manager_impl.h @@ -51,7 +51,7 @@ public: void alloc_with_defrag(Blob* blob, size_t size) override; DeviceTensorND alloc_workspace_with_defrag( - CompNode cn, TensorLayout layout) override; + CompNode cn, TensorLayout& layout) override; void register_blob(Blob* blob) override; diff --git a/imperative/src/impl/interpreter/interpreter_impl.cpp b/imperative/src/impl/interpreter/interpreter_impl.cpp index 95b6d93d..c3ed10d7 100644 --- a/imperative/src/impl/interpreter/interpreter_impl.cpp +++ b/imperative/src/impl/interpreter/interpreter_impl.cpp @@ -156,9 +156,16 @@ TensorInfo* ChannelImpl::put_impl(const HostTensorND& value, bool no_cache) { info->h_value = value; info->desc.value = value.proxy_to_default_cpu(); } - m_worker.add_task( - {Profiler::next_id(), Put{info, value, no_cache}, - get_channel_state().stack_manager.dump()}); + if (Profiler::is_profiling()) { + m_worker.add_task( + {Profiler::next_id(), Put{info, value, no_cache}, + get_channel_state().stack_manager.dump()}); + } else { + m_worker.add_task({ + Profiler::next_id(), + Put{info, value, no_cache}, + }); + } if (m_async_level == 0) { sync_impl(); info->desc.comp_node.sync(); @@ -205,8 +212,16 @@ void ChannelImpl::del_impl(Handle handle) { mgb_assert(m_valid_handle.count(handle), "invalid handle: %p", handle); auto* info = reinterpret_cast(handle); m_valid_handle.erase(handle); - m_worker.add_task( - {Profiler::next_id(), Del{info}, get_channel_state().stack_manager.dump()}); + if (Profiler::is_profiling()) { + m_worker.add_task( + {Profiler::next_id(), Del{info}, + get_channel_state().stack_manager.dump()}); + } else { + m_worker.add_task({ + Profiler::next_id(), + Del{info}, + }); + } } void ChannelImpl::drop(Handle handle) { @@ -218,9 +233,16 @@ void ChannelImpl::drop(Handle handle) { m_valid_handle.find(handle) != m_valid_handle.end(), "invalid handle: %p", handle); auto* info = reinterpret_cast(handle); - m_worker.add_task( - {Profiler::next_id(), Drop{info}, - get_channel_state().stack_manager.dump()}); + if (Profiler::is_profiling()) { + m_worker.add_task( + {Profiler::next_id(), Drop{info}, + get_channel_state().stack_manager.dump()}); + } else { + m_worker.add_task({ + Profiler::next_id(), + Drop{info}, + }); + } } } @@ -317,29 +339,29 @@ void ChannelImpl::dispatch_kernel( auto& state = get_channel_state(); auto& options = state.options; - auto name = op->trait()->make_name(*op); - auto _ = StackManager::Guard{name, &state.stack_manager}; - auto [output_descs, validated] = OpDef::infer_output_attrs_fallible(*op, input_descs); MGB_RECORD_EVENT(ShapeInferEvent, validated); - ApplyOp cmd{Profiler::next_id(), std::move(op)}; - cmd.validated = validated; - cmd.inputs = std::move(input_infos); + SmallVector output_infos; + output_infos.reserve(output_descs.size()); + uint64_t apply_id = Profiler::next_id(); + + outputs->reserve(output_descs.size()); + for (int i = 0; i < output_descs.size(); ++i) { auto&& desc = output_descs[i]; auto info = alloc(); - init(info, desc); + init(info, std::move(desc)); // make sure desc's value is consistent with h_value if (!info->desc.value.empty()) { info->h_value = HostTensorND::make_proxy(desc.value) .proxy_to_comp_node(desc.comp_node); } - cmd.outputs.push_back(info); + output_infos.push_back(info); outputs->push_back(reinterpret_cast(info)); } - auto op_info_getter = [op = cmd.op] { + auto op_info_getter = [op] { std::unordered_map op_info; auto props = OpDef::props(*op); for (auto&& [key, value] : props) { @@ -347,12 +369,25 @@ void ChannelImpl::dispatch_kernel( } return op_info; }; - MGB_RECORD_EVENT( - OpDispatchEvent, cmd.id, name, op_info_getter, tinfo_to_tid(cmd.inputs), - tinfo_to_tid(cmd.outputs), state.stack_manager.dump()); - m_worker.add_task( - {Profiler::next_id(), std::move(cmd), - get_channel_state().stack_manager.dump()}); + if (Profiler::is_profiling()) { + auto name = op->trait()->make_name(*op); + auto _ = StackManager::Guard{name, &state.stack_manager}; + MGB_RECORD_EVENT( + OpDispatchEvent, apply_id, name, op_info_getter, + tinfo_to_tid(std::move(input_infos)), + tinfo_to_tid(std::move(output_infos)), state.stack_manager.dump()); + m_worker.add_task( + {Profiler::next_id(), + ApplyOp{apply_id, std::move(op), std::move(input_infos), + std::move(output_infos), validated}, + get_channel_state().stack_manager.dump()}); + } else { + m_worker.add_task({ + Profiler::next_id(), + ApplyOp{apply_id, std::move(op), std::move(input_infos), + std::move(output_infos), validated}, + }); + } if (!validated && options.async_level == 1) { sync_impl(); } else if (options.async_level == 0) { @@ -396,7 +431,7 @@ SmallVector ChannelImpl::apply_op_impl( SmallVector input_infos; SmallVector input_descs; { - MGB_LOCK_GUARD(m_mutex); + MGB_LOCK_GUARD(m_info_spin); for (auto i : inputs) { auto info = reinterpret_cast(i); mgb_assert( @@ -526,9 +561,16 @@ void ChannelImpl::set_option(std::string name, size_t value) { mgb_assert(check_available(), "Channel already closed"); auto& state = get_channel_state(); state.options.set_option(name, value); - m_worker.add_task( - {Profiler::next_id(), SetOption{name, value}, - get_channel_state().stack_manager.dump()}); + if (Profiler::is_profiling()) { + m_worker.add_task( + {Profiler::next_id(), SetOption{name, value}, + get_channel_state().stack_manager.dump()}); + } else { + m_worker.add_task({ + Profiler::next_id(), + SetOption{name, value}, + }); + } } void ChannelImpl::clear_candidates() { @@ -540,8 +582,10 @@ void ChannelImpl::clear_candidates() { TensorInfo* ChannelImpl::alloc() { auto& state = get_channel_state(); auto info = [this] { - MGB_LOCK_GUARD(m_mutex); - return m_pool.alloc(); + MGB_LOCK_GUARD(m_pool_spin); + auto* ptr = m_pool.alloc_raw(); + new (ptr) TensorInfo(); + return (TensorInfo*)ptr; }(); info->id = Profiler::next_id(); if (Profiler::is_profiling()) { @@ -552,11 +596,11 @@ TensorInfo* ChannelImpl::alloc() { return info; } -void ChannelImpl::init(TensorInfo* info, LogicalTensorDesc desc) { +void ChannelImpl::init(TensorInfo* info, LogicalTensorDesc&& desc) { m_valid_handle.insert(reinterpret_cast(info)); MGB_RECORD_EVENT(TensorDeclareEvent, info->id, info->name); info->status = TensorInfo::Allocated; - info->desc = std::move(desc); + info->desc = desc; } void ChannelImpl::do_drop(TensorInfo* ptr, bool user = false) { @@ -626,7 +670,7 @@ void ChannelImpl::real_free(TensorInfo* ptr) { } MGB_RECORD_EVENT(TensorEraseEvent, ptr->id, ptr->ptr_use_count); ptr->status = TensorInfo::Deleted; - MGB_LOCK_GUARD(m_mutex); + MGB_LOCK_GUARD(m_pool_spin); m_pool.free(ptr); } @@ -705,21 +749,20 @@ void ChannelImpl::do_apply_op(const ApplyOp& cmd, std::string reason) { auto_evict(0); } auto apply_on_physical_tensor = - [&](auto&& self, const OpDef& def, SmallVector inputs, + [&](auto&& self, const OpDef& def, SmallVector&& inputs, SmallVector& output_descs, const bool& validated) -> SmallVector { - auto apply_functor = [&](std::shared_ptr op, - SmallVector inputs, - size_t nr_outputs) -> SmallVector { - auto opname = op->trait()->make_name(*op); - imperative_log_profile_begin(opname.c_str()); - // do not use infered output_desc in subgraph - auto outputs = self(self, *op, inputs, output_descs, false); - imperative_log_profile_end(opname.c_str()); - return outputs; - }; - auto const_functor = [&](TensorPtr value) -> TensorPtr { return value; }; if (def.trait()->make_forward_graph) { + auto apply_functor = [&](std::shared_ptr op, + SmallVector inputs, + size_t nr_outputs) -> SmallVector { + auto opname = op->trait()->make_name(*op); + imperative_log_profile_begin(opname.c_str()); + auto outputs = self(self, *op, std::move(inputs), output_descs, false); + imperative_log_profile_end(opname.c_str()); + return outputs; + }; + auto const_functor = [&](TensorPtr value) -> TensorPtr { return value; }; // apply recursivily SmallVector input_descs; for (auto&& input : inputs) { @@ -767,8 +810,7 @@ void ChannelImpl::do_apply_op(const ApplyOp& cmd, std::string reason) { for (auto&& [device, kernel_id] : kernels) { MGB_RECORD_EVENT(KernelLaunchEvent, apply_id, kernel_id, device); MGB_RECORD_EVENT_IF( - (Profiler::get_option("profile_device", 0)), RecordDeviceEvent, - Timer::record_device(device)); + profiling_device, RecordDeviceEvent, Timer::record_device(device)); } // Apply op SmallVector output_descs; @@ -777,29 +819,31 @@ void ChannelImpl::do_apply_op(const ApplyOp& cmd, std::string reason) { } // Here std::move is REQUIRED for removing duplicated references. auto outputs = apply_on_physical_tensor( - apply_on_physical_tensor, *cmd.op, inputs, output_descs, cmd.validated); + apply_on_physical_tensor, *cmd.op, std::move(inputs), output_descs, + cmd.validated); // After execute for (auto&& [device, kernel_id] : kernels) { MGB_RECORD_EVENT_IF( - (Profiler::get_option("profile_device", 0)), RecordDeviceEvent, - Timer::record_device(device)); + profiling_device, RecordDeviceEvent, Timer::record_device(device)); MGB_RECORD_EVENT(KernelLaunchFinishEvent, apply_id, kernel_id, device); } // End profiling operator mgb_assert(outputs.size() == cmd.outputs.size()); for (size_t i = 0; i < outputs.size(); ++i) { auto output = cmd.outputs[i]; - if (output == nullptr) { + if (mgb_unlikely(output == nullptr)) { MGB_RECORD_EVENT(OpOutputEvent, 0); MGB_RECORD_EVENT(OpOutputFinishEvent, 0); - } else if (output->ptr != nullptr) { + } else if (mgb_unlikely(output->ptr != nullptr)) { MGB_RECORD_EVENT(OpOutputEvent, output->id); MGB_RECORD_EVENT(OpOutputFinishEvent, output->id); } else { MGB_RECORD_EVENT(OpOutputEvent, output->id); produce_tensor(output, outputs[i]); MGB_RECORD_EVENT(OpOutputFinishEvent, output->id); - sample_on_device(output->desc.comp_node, false); + if (Profiler::is_profiling()) { + sample_on_device(output->desc.comp_node, false); + } } } @@ -946,9 +990,16 @@ TensorPtr ChannelImpl::wait_tensor(TensorInfo* info, TensorProp prop) { if (require_host && !host_available()) { // avoid dead lock lock.unlock(); - m_worker.add_task( - {Profiler::next_id(), GetValue{info}, - get_channel_state().stack_manager.dump()}); + if (Profiler::is_profiling()) { + m_worker.add_task( + {Profiler::next_id(), GetValue{info}, + get_channel_state().stack_manager.dump()}); + } else { + m_worker.add_task({ + Profiler::next_id(), + GetValue{info}, + }); + } lock.lock(); wait_host = true; } @@ -1045,7 +1096,7 @@ void ChannelImpl::process_one_task(Command& icmd) { sample_on_device(cmd.dest->desc.comp_node, false); } else if constexpr (std::is_same_v) { for (auto& i : cmd.inputs) { - if (i->invalid) { + if (mgb_unlikely(i->invalid)) { MGB_LOCK_GUARD(m_mutex); for (auto& i : cmd.outputs) { i->invalid = true; @@ -1053,16 +1104,18 @@ void ChannelImpl::process_one_task(Command& icmd) { return; } } - m_apply_stack.push({cmd, 0, nullptr, "cmd"}); - flush_apply_stack(); - for (size_t i = 0; i < cmd.outputs.size(); ++i) { - auto output = cmd.outputs[i]; - if (output == nullptr) { - continue; - } - if (state.options.enable_dtr_auto_drop) { + if (state.options.enable_dtr_auto_drop) { + m_apply_stack.push({cmd, 0, nullptr, "cmd"}); + flush_apply_stack(); + for (size_t i = 0; i < cmd.outputs.size(); ++i) { + auto output = cmd.outputs[i]; + if (output == nullptr) { + continue; + } output->dsu_ptr = std::make_shared(output->compute_time); } + } else { + do_apply_op(cmd, "cmd"); } if (state.options.enable_drop && state.options.record_computing_path) { auto is_inplace = [](std::tuple tuple2) { @@ -1229,9 +1282,16 @@ void ChannelImpl::start_profile() { mgb_assert(check_available(), "Channel already closed"); auto capture_tensors = collect_valid_tensors(); if (capture_tensors.size() > 0) { - m_worker.add_task( - {Profiler::next_id(), StartProfile{std::move(capture_tensors)}, - get_channel_state().stack_manager.dump()}); + if (Profiler::is_profiling()) { + m_worker.add_task( + {Profiler::next_id(), StartProfile{std::move(capture_tensors)}, + get_channel_state().stack_manager.dump()}); + } else { + m_worker.add_task({ + Profiler::next_id(), + StartProfile{std::move(capture_tensors)}, + }); + } } } @@ -1240,9 +1300,16 @@ void ChannelImpl::stop_profile() { mgb_assert(check_available(), "Channel already closed"); auto escape_tensors = collect_valid_tensors(); if (escape_tensors.size() > 0) { - m_worker.add_task( - {Profiler::next_id(), StopProfile{std::move(escape_tensors)}, - get_channel_state().stack_manager.dump()}); + if (Profiler::is_profiling()) { + m_worker.add_task( + {Profiler::next_id(), StopProfile{std::move(escape_tensors)}, + get_channel_state().stack_manager.dump()}); + } else { + m_worker.add_task({ + Profiler::next_id(), + StopProfile{std::move(escape_tensors)}, + }); + } } } @@ -1252,9 +1319,16 @@ void ChannelImpl::push_scope(std::string name) { auto& state = get_channel_state(); state.stack_manager.enter(name); MGB_RECORD_EVENT(ScopeEvent, name); - m_worker.add_task( - {Profiler::next_id(), PushScope{name}, - get_channel_state().stack_manager.dump()}); + if (Profiler::is_profiling()) { + m_worker.add_task( + {Profiler::next_id(), PushScope{name}, + get_channel_state().stack_manager.dump()}); + } else { + m_worker.add_task({ + Profiler::next_id(), + PushScope{name}, + }); + } } void ChannelImpl::pop_scope(std::string name) { @@ -1263,9 +1337,16 @@ void ChannelImpl::pop_scope(std::string name) { auto& state = get_channel_state(); state.stack_manager.exit(name); MGB_RECORD_EVENT(ScopeFinishEvent, name); - m_worker.add_task( - {Profiler::next_id(), PopScope{name}, - get_channel_state().stack_manager.dump()}); + if (Profiler::is_profiling()) { + m_worker.add_task( + {Profiler::next_id(), PopScope{name}, + get_channel_state().stack_manager.dump()}); + } else { + m_worker.add_task({ + Profiler::next_id(), + PopScope{name}, + }); + } } void ChannelImpl::assert_in_channel() { @@ -1281,10 +1362,12 @@ void ChannelImpl::assert_in_worker() { } void ChannelImpl::sample_on_device(CompNode device, bool force) { + if (!Profiler::is_profiling()) { + return; + } if (!force) { thread_local int last_sample_id = 0; - int sample_rate = - Profiler::is_profiling() ? Profiler::get_option("sample_rate", 0) : 0; + int sample_rate = Profiler::get_option("sample_rate", 0); if (!sample_rate || ((++last_sample_id) % sample_rate != 0)) { return; } diff --git a/imperative/src/impl/interpreter/interpreter_impl.h b/imperative/src/impl/interpreter/interpreter_impl.h index 586c0000..970c2b2c 100644 --- a/imperative/src/impl/interpreter/interpreter_impl.h +++ b/imperative/src/impl/interpreter/interpreter_impl.h @@ -77,7 +77,7 @@ private: struct State; TensorInfo* alloc(); - void init(TensorInfo*, LogicalTensorDesc desc); + void init(TensorInfo*, LogicalTensorDesc&& desc); void free(TensorInfo*); void real_free(TensorInfo*); void recursive_free(TensorInfo*); @@ -132,6 +132,8 @@ private: MemPool m_pool; std::unordered_set m_valid_handle; TensorInfo* m_waitee = nullptr; + Spinlock m_pool_spin; + Spinlock m_info_spin; uint64_t m_waitee_id = 0; std::exception_ptr m_worker_exc; std::function m_profile_dump_callback; diff --git a/imperative/src/impl/op_def.cpp b/imperative/src/impl/op_def.cpp index c7e67343..49348789 100644 --- a/imperative/src/impl/op_def.cpp +++ b/imperative/src/impl/op_def.cpp @@ -39,7 +39,7 @@ DispatchMode OpDef::decide_dispatch_mode( } SmallVector OpDef::apply_on_physical_tensor( - const OpDef& def, SmallVector inputs, + const OpDef& def, const SmallVector& inputs, SmallVector& output_descs, const bool& validated) { return def.trait()->apply_on_physical_tensor( def, std::move(inputs), output_descs, validated); diff --git a/imperative/src/impl/op_trait.h b/imperative/src/impl/op_trait.h index 08c2166b..8398ed71 100644 --- a/imperative/src/impl/op_trait.h +++ b/imperative/src/impl/op_trait.h @@ -160,7 +160,7 @@ struct OpMeth : public thin_function { } return false; }; - while (!this->Base::operator bool()) { + while (mgb_unlikely(!this->Base::operator bool())) { using Mode = OpMethFallbackMode; if (match_mode(Mode::FromSubgraph)) { OpMethFallbackFromSubgraph::impl(*const_cast(this), Tag{}); diff --git a/imperative/src/include/megbrain/imperative/blob_manager.h b/imperative/src/include/megbrain/imperative/blob_manager.h index 9732c1df..388b3249 100644 --- a/imperative/src/include/megbrain/imperative/blob_manager.h +++ b/imperative/src/include/megbrain/imperative/blob_manager.h @@ -27,7 +27,7 @@ public: virtual void alloc_with_defrag(Blob* blob, size_t size) = 0; virtual DeviceTensorND alloc_workspace_with_defrag( - CompNode cn, TensorLayout layout) = 0; + CompNode cn, TensorLayout& layout) = 0; virtual void register_blob(Blob* blob) = 0; diff --git a/imperative/src/include/megbrain/imperative/op_def.h b/imperative/src/include/megbrain/imperative/op_def.h index 8bd01fba..30278563 100644 --- a/imperative/src/include/megbrain/imperative/op_def.h +++ b/imperative/src/include/megbrain/imperative/op_def.h @@ -51,7 +51,7 @@ public: const OpDef& def, const SmallVector& inputs); static SmallVector apply_on_physical_tensor( - const OpDef& def, SmallVector inputs, + const OpDef& def, const SmallVector& inputs, SmallVector& output_descs, const bool& validated); /*! -- GitLab