提交 f2027b8d 编写于 作者: M Megvii Engine Team 提交者: huangxinda

refactor(interpreter): recompute with do_apply_op

GitOrigin-RevId: 150fb46a6377724f975ad066f5e9c6f93f30d317
上级 5cb35c1b
......@@ -43,6 +43,7 @@ struct Put {
};
struct ApplyOp {
uint64_t id;
std::shared_ptr<OpDef> op;
SmallVector<TensorInfo*> inputs;
SmallVector<TensorInfo*> outputs;
......
......@@ -23,9 +23,9 @@ DEF_EVENT(Command, {
IdentifiedCommand icmd;
});
DEF_EVENT(CommandEnqueue, :CommandEvent);
DEF_EVENT(CommandExecute, :CommandEvent);
DEF_EVENT(CommandFinish, :CommandEvent);
DEF_EVENT(CommandEnqueue, :CommandEvent {});
DEF_EVENT(CommandExecute, :CommandEvent {});
DEF_EVENT(CommandFinish, :CommandEvent {});
DEF_DUR_EVENT(OpExecute, {
uint64_t id;
std::shared_ptr<OpDef> op;
......
......@@ -11,6 +11,8 @@
#include "./interpreter_impl.h"
#include "range/v3/all.hpp"
#include "megbrain/common.h"
#include "megbrain/imperative/opr_utility.h"
#include "megbrain/imperative/ops/autogen.h"
......@@ -34,6 +36,16 @@ using namespace interpreter::intl;
} \
namespace {
auto tinfo_to_tid(SmallVector<TensorInfo*> tinfo) {
SmallVector<uint64_t> tid;
for (auto* ptinfo: tinfo) {
tid.push_back(ptinfo->id);
}
return tid;
};
}
std::thread::id ChannelImpl::get_worker_tid() {
return m_worker_state.tid;
}
......@@ -170,13 +182,6 @@ void ChannelImpl::dispatch_default_cpu(
output_tensornds.emplace_back(HostTensorND(output_cn, desc.layout).proxy_to_default_cpu());
}
auto tinfo_to_tid = [&](SmallVector<TensorInfo*> tinfo) {
SmallVector<uint64_t> tid;
for (auto* ptinfo: tinfo) {
tid.push_back(ptinfo->id);
}
return tid;
};
auto apply_id = ++m_last_id;
RECORD_EVENT(OpExecuteEvent, apply_id, op, tinfo_to_tid(input_infos), {});
......@@ -206,7 +211,7 @@ void ChannelImpl::dispatch_kernel(
auto& state = get_channel_state();
auto [output_descs, validated] = OpDef::infer_output_attrs_fallible(*op, input_descs);
ApplyOp cmd{std::move(op)};
ApplyOp cmd{++m_last_id, std::move(op)};
cmd.inputs = std::move(input_infos);
cmd.outputs.reserve(output_descs.size());
outputs->reserve(output_descs.size());
......@@ -527,30 +532,96 @@ void ChannelImpl::regenerate(TensorInfo* dest) {
}
}
void ChannelImpl::recompute(TensorInfo::ComputePath* path) {
void ChannelImpl::do_apply_op(const ApplyOp& cmd) {
using namespace ranges;
using namespace ranges::views;
auto& state = get_worker_state();
SmallVector<TensorPtr> inputs;
inputs.reserve(path->inputs.size());
m_dtr.pin(path->inputs);
for (auto i : path->inputs) {
if (!i->ptr) {
uint64_t apply_id = cmd.id;
SmallVector<TensorPtr> tensor_inputs;
if (state.options.enable_dtr_auto_drop) {
m_dtr.pin(cmd.inputs);
}
for (auto i : cmd.inputs) {
if (!i->ptr && i->evict_type != EvictType::NONE) {
regenerate(i);
}
inputs.push_back(i->ptr);
// inputs.push_back(i->ptr);
m_dtr.update_used_time(i);
}
tensor_inputs.reserve(cmd.inputs.size());
// refcnt == 1, owners: [TensorInfo::ptr]
for (auto i : cmd.inputs) {
mgb_assert(i->ptr, "Invalid input tensor ptr!");
tensor_inputs.push_back(i->ptr);
}
// Begin profiling operator
SmallVector<CompNode> devices;
if (state.profiler->is_profiling()) {
for (auto&& i : concat(cmd.inputs, cmd.outputs)) {
if (i != nullptr && count(devices, i->desc.comp_node) == 0) {
devices.push_back(i->desc.comp_node);
}
}
}
for (auto* del : cmd.dels) {
free(del);
}
RECORD_EVENT(OpExecuteEvent, apply_id, cmd.op,
tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs));
for (auto&& device: devices) {
sync_device_scope(device);
RECORD_DEVICE_EVENT(KernelExecuteEvent, device, apply_id, cmd.op,
tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs));
}
if (state.options.enable_dtr_auto_drop && state.options.dtr_eviction_threshold > 0) {
auto_evict();
}
auto outputs = OpDef::apply_on_physical_tensor(*path->op, inputs);
m_dtr.estimate_timestamp += path->compute_time / 1e8;
m_dtr.unpin(path->inputs);
for (size_t i = 0;i < outputs.size();i ++) {
// Apply op
// Here std::move is REQUIRED for removing duplicated references.
auto tensor_outputs = OpDef::apply_on_physical_tensor(
*cmd.op, tensor_inputs);
// After execute
for (auto&& device : devices) {
RECORD_DEVICE_EVENT(KernelExecuteFinishEvent, device, apply_id, cmd.op,
tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs));
}
RECORD_EVENT(OpExecuteFinishEvent, apply_id, cmd.op,
tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs));
// End profiling operator
mgb_assert(tensor_outputs.size() == cmd.outputs.size());
for (size_t i = 0; i < tensor_outputs.size(); ++i) {
auto output = cmd.outputs[i];
if (output != nullptr && output->ptr == nullptr) {
produce_tensor(output, tensor_outputs[i]);
}
}
if (state.options.enable_dtr_auto_drop) {
double estimate_compute_time = 0;
for (auto i : cmd.inputs) {
estimate_compute_time += i->memory;
}
for (auto i : tensor_outputs) {
estimate_compute_time += i->blob()->size();
}
m_dtr.estimate_timestamp += estimate_compute_time / 1e8;
for (auto i : cmd.outputs) {
if (i != nullptr) {
i->compute_time = estimate_compute_time;
}
}
m_dtr.unpin(cmd.inputs);
}
}
void ChannelImpl::recompute(TensorInfo::ComputePath* path) {
auto& state = get_worker_state();
do_apply_op(ApplyOp{path->id, path->op, path->inputs, path->outputs, {}});
for (size_t i = 0;i < path->outputs.size();i ++) {
auto&& o = path->outputs[i];
if (o) {
o->recompute_times ++;
if (!o->ptr) {
produce_tensor(o, std::move(outputs[i]), false);
if (state.options.enable_dtr_auto_drop) {
m_dtr.update_dsu_after_recompute(o);
}
......@@ -641,6 +712,9 @@ void ChannelImpl::sync_device_scope(CompNode device) {
}
void ChannelImpl::process_one_task(IdentifiedCommand& icmd) {
using namespace ranges;
using namespace ranges::views;
auto& state = get_worker_state();
RECORD_EVENT(CommandExecuteEvent, icmd);
bool finished = false;
......@@ -658,129 +732,25 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) {
auto value = cmd.no_cache ? std::make_shared<Tensor>(cmd.value) : Tensor::make(cmd.value);
produce_tensor(cmd.dest, std::move(value));
} else if constexpr (std::is_same_v<T, ApplyOp>) {
uint64_t apply_id = ++m_last_id;
SmallVector<TensorPtr> tensor_inputs;
SmallVector<CompNode> devices;
if (state.options.enable_dtr_auto_drop) {
m_dtr.pin(cmd.inputs);
}
for (auto i : cmd.inputs) {
if (!i->ptr && i->evict_type != EvictType::NONE) {
regenerate(i);
}
m_dtr.update_used_time(i);
}
tensor_inputs.reserve(cmd.inputs.size());
// refcnt == 1, owners: [TensorInfo::ptr]
for (auto i : cmd.inputs) {
mgb_assert(i->ptr, "Invalid input tensor ptr!");
// refcnt ++, owners: [i->ptr, tensor_inputs]
tensor_inputs.push_back(i->ptr);
}
// Begin profiling operator
auto tinfo_to_tid = [&](SmallVector<TensorInfo*> tinfo) {
SmallVector<uint64_t> tid;
for (auto* ptinfo: tinfo) {
tid.push_back(ptinfo->id);
}
return tid;
};
if (state.profiler->is_profiling()) {
// Collecting devices
for (auto i : cmd.inputs) {
devices.push_back(i->desc.comp_node);
}
for (auto i : cmd.outputs) {
devices.push_back(i->desc.comp_node);
}
devices.erase(std::unique(devices.begin(), devices.end()), devices.end());
}
// Fused by command buffer. @see: CommandBuffer::fuse_del
// Now if dest is inplacable, it's refcnt would be decreased to 1 and owned by tensor_inputs after Del.
// Note for exprs like 'y = x op x', inplace is unsupported yet but Del would be also fused.
for (auto* del : cmd.dels) {
// refcnt --, owners: [tensor_inputs]
// if it's decreased to 1, would be detected at @see: proxy_graph_detail::apply_on_physical_tensor
free(del);
}
// Before wait
//TODO: split operator wait and execute so that OpWait could be corrected recorded.
// Before execute
RECORD_EVENT(OpExecuteEvent, apply_id, cmd.op, tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs));
if (state.profiler->is_profiling()) {
for (auto&& device: devices) {
sync_device_scope(device);
RECORD_DEVICE_EVENT(KernelExecuteEvent, device, apply_id, cmd.op,
tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs));
}
}
if (state.options.enable_dtr_auto_drop && state.options.dtr_eviction_threshold > 0) {
auto_evict();
}
// Apply op
// Here std::move is REQUIRED for removing duplicated references.
auto tensor_outputs = OpDef::apply_on_physical_tensor(
*cmd.op, std::move(tensor_inputs));
// After execute
RECORD_EVENT(OpExecuteFinishEvent, apply_id, cmd.op, tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs));
if (state.profiler->is_profiling()) {
for (auto&& device: devices) {
RECORD_DEVICE_EVENT(KernelExecuteFinishEvent, device, apply_id, cmd.op, tinfo_to_tid(cmd.inputs), tinfo_to_tid(cmd.outputs));
}
}
// End profiling operator
double estimate_compute_time = 0;
if (state.options.enable_dtr_auto_drop) {
for (auto i : cmd.inputs) {
estimate_compute_time += i->memory;
}
for (auto i : tensor_outputs) {
estimate_compute_time += i->blob()->size();
}
m_dtr.estimate_timestamp += estimate_compute_time / 1e8;
for (auto i : cmd.outputs) {
i->compute_time = estimate_compute_time;
m_dtr.update_used_time(i);
}
if (cmd.outputs[0]->producer) {
cmd.outputs[0]->producer->compute_time = estimate_compute_time;
}
m_dtr.unpin(cmd.inputs);
}
mgb_assert(tensor_outputs.size() == cmd.outputs.size());
for (size_t i = 0; i < tensor_outputs.size(); ++i) {
if (cmd.outputs[i] == nullptr) {
do_apply_op(cmd);
for (size_t i = 0; i < cmd.outputs.size(); ++i) {
auto output = cmd.outputs[i];
if (output == nullptr) {
continue;
}
produce_tensor(cmd.outputs[i], std::move(tensor_outputs[i]));
if (state.options.enable_dtr_auto_drop) {
cmd.outputs[i]->dsu_ptr = std::make_shared<DsuNode>(estimate_compute_time);
cmd.outputs[i]->dsu_ptr = std::make_shared<DsuNode>(output->compute_time);
}
}
if (state.options.enable_drop == 1
&& state.options.record_computing_path == 1){
bool is_inplace = false;
bool cross_cn = false;
for (auto input : cmd.inputs) {
for (auto output : cmd.outputs) {
if (input->ptr->blob()->storage() == output->ptr->blob()->storage()) {
is_inplace = true;
break;
}
if (state.options.enable_drop && state.options.record_computing_path) {
auto is_inplace = [](std::tuple<TensorInfo*, TensorInfo*> tuple2) {
auto& input = std::get<0>(tuple2);
auto& output = std::get<1>(tuple2);
if (!input->ptr || !output->ptr) {
return false;
}
}
for (auto input : cmd.inputs) {
if (input->ptr->comp_node() != m_dtr.comp_node) {
cross_cn = true;
break;
}
}
for (auto output : cmd.outputs) {
if (output->ptr->comp_node() != m_dtr.comp_node) {
cross_cn = true;
break;
}
}
return input->ptr->blob()->storage() == output->ptr->blob()->storage();
};
// FIXME: do not use opname as identifier
auto get_name = [](const OpDef& opdef) {
if (auto attr = opdef.try_cast_final<OprAttr>()) {
......@@ -788,8 +758,15 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) {
}
return opdef.dyn_typeinfo()->name;
};
if (!is_inplace && !cross_cn && !m_dtr.is_bad_op(get_name(*cmd.op))) {
TensorInfo::ComputePath::make(cmd.op, cmd.inputs, cmd.outputs);
auto is_cross_cn = [comp_node=m_dtr.comp_node](TensorInfo* info){
return info->desc.comp_node != comp_node;
};
bool cross_cn = any_of(concat(cmd.inputs, cmd.outputs), is_cross_cn);
bool inplace = any_of(cartesian_product(cmd.inputs, cmd.outputs), is_inplace);
if (!inplace && !cross_cn && !m_dtr.is_bad_op(get_name(*cmd.op))) {
TensorInfo::ComputePath::make(cmd.id, cmd.op, cmd.inputs, cmd.outputs);
size_t detach_cnt = 0;
for (auto output : cmd.outputs) {
if (!output->size_exceeds_thd(state.options.dtr_evictee_minimum_size)) {
......
......@@ -90,6 +90,7 @@ private:
void regenerate(TensorInfo* dest);
void recompute(TensorInfo::ComputePath* path);
void do_apply_op(const ApplyOp& cmd);
void dispatch_default_cpu(
std::shared_ptr<OpDef> op,
......
......@@ -75,18 +75,19 @@ struct TensorInfo {
std::shared_ptr<DsuNode> dsu_ptr;
struct ComputePath {
uint64_t id;
std::shared_ptr<OpDef> op;
SmallVector<TensorInfo*> inputs;
SmallVector<TensorInfo*> unique_inputs;
SmallVector<TensorInfo*> outputs;
double compute_time = 0;
size_t ref_cnt() {
return outputs.size() - std::count(outputs.begin(), outputs.end(), nullptr);
}
static ComputePath* make(std::shared_ptr<OpDef> op, SmallVector<TensorInfo*> inputs, SmallVector<TensorInfo*> outputs) {
static ComputePath* make(uint64_t id, std::shared_ptr<OpDef> op, SmallVector<TensorInfo*> inputs, SmallVector<TensorInfo*> outputs) {
auto* path = new TensorInfo::ComputePath();
path->id = id;
path->op = op;
path->inputs = inputs;
path->outputs = outputs;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册