From 5b4f7c5dd014aec3c9ba46eff2c3055e87daf09d Mon Sep 17 00:00:00 2001 From: Megvii Engine Team Date: Mon, 2 Aug 2021 16:54:15 +0800 Subject: [PATCH] perf(interpreter): unwind ops with make_forward_graph GitOrigin-RevId: 5fb8c85089f507d31a7e4d8552089660c068a4ad --- .../src/impl/interpreter/interpreter_impl.cpp | 100 ++++++++++++------ .../src/impl/interpreter/interpreter_impl.h | 4 + 2 files changed, 71 insertions(+), 33 deletions(-) diff --git a/imperative/src/impl/interpreter/interpreter_impl.cpp b/imperative/src/impl/interpreter/interpreter_impl.cpp index 2de9b78f..64d3b598 100644 --- a/imperative/src/impl/interpreter/interpreter_impl.cpp +++ b/imperative/src/impl/interpreter/interpreter_impl.cpp @@ -137,8 +137,11 @@ TensorInfo* ChannelImpl::put_impl(const HostTensorND& value, bool no_cache) { Handle ChannelImpl::put(const DeviceTensorND& data, const HostTensorND& hvalue) { MGB_LOCK_GUARD(m_spin); - auto& state = get_channel_state(); mgb_assert(check_available(), "Channel already closed"); + return put_impl(data, hvalue); +} +TensorInfo* ChannelImpl::put_impl(const DeviceTensorND& data, const HostTensorND& hvalue) { + auto& state = get_channel_state(); state.scopes.push("Put"); auto info = alloc(); RECORD_EVENT(TensorCommandEvent, info->id, TensorCommandEvent::Put); @@ -335,6 +338,12 @@ SmallVector ChannelImpl::apply_op( const SmallVector& inputs) { MGB_LOCK_GUARD(m_spin); mgb_assert(check_available(), "Channel already closed"); + return apply_op_impl(std::move(op), inputs); +} + +SmallVector ChannelImpl::apply_op_impl( + std::shared_ptr op, + const SmallVector& inputs) { auto& state = get_channel_state(); for (auto i : inputs) { mgb_assert(m_valid_handle.find(i) != m_valid_handle.end(), @@ -610,8 +619,12 @@ void ChannelImpl::do_apply_op(const ApplyOp& cmd) { auto& state = get_worker_state(); bool profiling_device = Profiler::is_profiling() && Profiler::get_option("profile_device", 0); uint64_t apply_id = cmd.id; - SmallVector tensor_inputs; - SmallVector input_memory_desc; + struct TensorWithDesc { + TensorPtr tensor; + MemoryDesc desc; + }; + SmallVector inputs; + // SmallVector tensor_inputs; if (state.options.enable_dtr_auto_drop) { m_dtr.pin(cmd.inputs); } @@ -621,33 +634,59 @@ void ChannelImpl::do_apply_op(const ApplyOp& cmd) { } m_dtr.update_used_time(i); } - tensor_inputs.reserve(cmd.inputs.size()); + // tensor_inputs.reserve(cmd.inputs.size()); + inputs.reserve(cmd.inputs.size()); // refcnt == 1, owners: [TensorInfo::ptr] for (auto i : cmd.inputs) { mgb_assert(i->ptr, "Invalid input tensor ptr!"); - mgb_assert(i->mem_desc.id, "Invalid input tensor mem desc!"); // refcnt ++, owners: [i->ptr, tensor_inputs] - tensor_inputs.push_back(i->ptr); - input_memory_desc.push_back(i->mem_desc); + // tensor_inputs.push_back(i->ptr); + inputs.push_back({i->ptr, i->mem_desc}); } if (state.options.enable_dtr_auto_drop && state.options.dtr_eviction_threshold > 0) { auto_evict(0); } - auto [outputs_mem_desc, tensor_outputs, workspaces] = init_output_and_workspace(*cmd.op, tensor_inputs, input_memory_desc); - if (outputs_mem_desc.size()) { - for (size_t i = 0;i < outputs_mem_desc.size();i ++) { - if (cmd.outputs[i]) { - cmd.outputs[i]->mem_desc = outputs_mem_desc[i]; + auto apply_on_physical_tensor = [&](auto&& self, const OpDef& def, SmallVector inputs) -> SmallVector { + auto apply_functor = [&](std::shared_ptr op, SmallVector inputs, size_t nr_outputs) -> SmallVector { + auto opname = op->trait()->make_name(*op); + auto outputs = self(self, *op, inputs); + return outputs; + }; + auto const_functor = [&](TensorPtr value) -> TensorWithDesc { + return {value, MemoryDesc{value->layout(), 0, value->comp_node(), StorageIdentifier::make()}}; + }; + if (def.trait()->make_forward_graph) { + // apply recursivily + SmallVector input_descs; + for (auto&& input: inputs) { + input_descs.push_back({{{}, input.tensor->dtype()}, input.tensor->comp_node()}); } + auto forward_graph = OpDef::make_forward_graph(def, input_descs); + auto outputs = forward_graph.apply(inputs, apply_functor, const_functor); + return outputs; } - } else { - // fail to infer mem plan - for (auto && out : cmd.outputs) { - if (out) { - out->mem_desc.id = StorageIdentifier::make(); + SmallVector input_tensors; + SmallVector input_descs; + // size_t next_mem_desc_id = 0; + for (auto&& input: inputs) { + input_tensors.push_back(input.tensor); + input_descs.push_back(input.desc); + } + auto [output_descs, output_tensors, workspaces] = init_output_and_workspace(def, input_tensors, input_descs); + if (!output_descs.empty()) { + OpDef::execute(def, input_tensors, output_tensors, workspaces); + } else { + output_tensors = OpDef::apply_on_physical_tensor(def, input_tensors); + for (auto&& output_tensor: output_tensors) { + output_descs.push_back(MemoryDesc{output_tensor->layout(), 0, output_tensor->comp_node(), StorageIdentifier::make()}); } } - } + SmallVector outputs; + for (auto&& [output_tensor, output_desc]: ranges::zip_view(output_tensors, output_descs)) { + outputs.push_back({output_tensor, output_desc}); + } + return outputs; + }; RECORD_EVENT(OpExecuteEvent, apply_id); // Begin profiling operator SmallVector> kernels; @@ -686,20 +725,14 @@ void ChannelImpl::do_apply_op(const ApplyOp& cmd) { } // Apply op // Here std::move is REQUIRED for removing duplicated references. - if (outputs_mem_desc.size()) { - OpDef::execute( - *cmd.op, std::move(tensor_inputs), tensor_outputs, std::move(workspaces)); - } else { - tensor_outputs = OpDef::apply_on_physical_tensor( - *cmd.op, std::move(tensor_inputs)); - } + auto outputs = apply_on_physical_tensor(apply_on_physical_tensor, *cmd.op, inputs); // After execute for (auto&& [device, kernel_id]: kernels) { RECORD_EVENT(KernelExecuteFinishEvent, apply_id, kernel_id, Timer::record_event(device)); } // End profiling operator - mgb_assert(tensor_outputs.size() == cmd.outputs.size()); - for (size_t i = 0; i < tensor_outputs.size(); ++i) { + mgb_assert(outputs.size() == cmd.outputs.size()); + for (size_t i = 0; i < outputs.size(); ++i) { auto output = cmd.outputs[i]; if (output == nullptr) { RECORD_EVENT(OpOutputEvent, 0); @@ -709,7 +742,8 @@ void ChannelImpl::do_apply_op(const ApplyOp& cmd) { RECORD_EVENT(OpOutputFinishEvent, output->id); } else { RECORD_EVENT(OpOutputEvent, output->id); - produce_tensor(output, tensor_outputs[i]); + produce_tensor(output, outputs[i].tensor); + output->mem_desc = outputs[i].desc; RECORD_EVENT(OpOutputFinishEvent, output->id); sample_on_device(output->desc.comp_node, false); } @@ -720,8 +754,8 @@ void ChannelImpl::do_apply_op(const ApplyOp& cmd) { for (auto i : cmd.inputs) { estimate_compute_time += i->memory; } - for (auto i : tensor_outputs) { - estimate_compute_time += i->blob()->size(); + for (auto i : outputs) { + estimate_compute_time += i.tensor->blob()->size(); } m_dtr.estimate_timestamp += estimate_compute_time / 1e8; for (auto i : cmd.outputs) { @@ -751,7 +785,7 @@ void ChannelImpl::recompute(TensorInfo::ComputePath* path) { } } -bool ChannelImpl::auto_evict(size_t force_num=0) { +bool ChannelImpl::auto_evict(size_t force_num) { auto& state = get_worker_state(); if (!m_dtr.comp_node.valid()) { return false; @@ -884,7 +918,7 @@ void ChannelImpl::alloc_tensor_with_evict(TensorPtr x) { set_log_level(pre_level); mgb_log_warn("reallocating all cuda memory to alleviate fragmentation, the performance may be affected"); set_log_level(LogLevel::NO_LOG); - BlobManager::inst()->defrag(x->blob()->comp_node()); + BlobManager::inst()->defrag(x->comp_node()); BlobManager::inst()->alloc_direct(x->blob().get(), x->blob()->size()); } }); @@ -914,7 +948,7 @@ std::tuple, SmallVector, SmallVectoris_sys_alloc()) { tensors.push_back(Tensor::make(desc[i].layout, desc[i].cn)); - if (!desc[i].layout.is_empty() && state.options.enable_dtr_auto_drop) { + if (state.options.enable_dtr_auto_drop && !desc[i].layout.is_empty()) { alloc_tensor_with_evict(tensors.back()); } } else if (desc[i].id->is_from_other()) { diff --git a/imperative/src/impl/interpreter/interpreter_impl.h b/imperative/src/impl/interpreter/interpreter_impl.h index 4422f55d..bed3bf83 100644 --- a/imperative/src/impl/interpreter/interpreter_impl.h +++ b/imperative/src/impl/interpreter/interpreter_impl.h @@ -85,8 +85,12 @@ private: void detach_users(TensorInfo*); TensorInfo* put_impl(const HostTensorND& value, bool no_cache); + TensorInfo* put_impl(const DeviceTensorND& value, const HostTensorND& hvalue); void del_impl(Handle); void sync_impl(); + SmallVector apply_op_impl( + std::shared_ptr op, + const SmallVector& inputs); TensorPtr wait_tensor(TensorInfo* info, profiler::TensorProp prop); void notify_tensor_unsafe(TensorInfo* info); -- GitLab