提交 fe99cdc7 编写于 作者: M Megvii Engine Team

feat(interpreter): add dynamic sublinear

GitOrigin-RevId: 7de54fe7433fcad91d36b91297c87ccf9eaa5fe5
上级 62c394ca
......@@ -3,7 +3,7 @@ from collections import defaultdict
from contextlib import contextmanager
from typing import Callable
from ..core._imperative_rt.core2 import pop_scope, push_scope
from ..core._imperative_rt.core2 import pop_scope, push_scope, set_option
from ..core.autodiff.grad import Grad
from ..logger import get_logger
from ..tensor import Tensor
......@@ -241,6 +241,7 @@ class GradManager:
:param dy: tensor or list of tensors. Defaults to 1 if y is scalar
"""
push_scope("backward")
set_option("record_computing_path", 0)
from ..functional import ones_like
global backwarding_grad_manager
......@@ -284,6 +285,7 @@ class GradManager:
finally:
self.release()
backwarding_grad_manager = cache
set_option("record_computing_path", 1)
pop_scope("backward")
def record(self):
......
......@@ -15,7 +15,7 @@ from typing import Union
import numpy as np
from ..core._imperative_rt.core2 import pop_scope, push_scope
from ..core._imperative_rt.core2 import pop_scope, push_scope, set_option
from ..core.tensor.utils import set_convert_inputs
from ..tensor import Parameter, Tensor
from ..utils.deprecation import deprecated
......@@ -148,6 +148,7 @@ class Optimizer(metaclass=ABCMeta):
"""
# set the globle state `_enable_convert_inputs` to `False` to disable
# the `convert_inputs` for param updates
set_option("record_computing_path", 0)
backup = set_convert_inputs(False)
for group in self.param_groups:
if isinstance(group["params"], set):
......@@ -161,6 +162,7 @@ class Optimizer(metaclass=ABCMeta):
pop_scope("step")
# restore the globle state `_enable_convert_inputs`
set_convert_inputs(backup)
set_option("record_computing_path", 1)
return self
@deprecated(version="1.0", reason="use clear_grad instead")
......
# -*- coding: utf-8 -*-
# MegEngine is Licensed under the Apache License, Version 2.0 (the "License")
#
# Copyright (c) 2014-2021 Megvii Inc. All rights reserved.
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
from ..core._imperative_rt.core2 import set_option
from ..core._imperative_rt.utils import _set_defrag
class DTR:
r"""
DTR implements `Dynamic Tensor Rematerialization <https://arxiv.org/abs/2006.09616>`_ in MegEngine.
It is basically an online algorithm for checkpointing driven by certain eviction policies.
.. code-block::
from megengine.utils.dtr import DTR
ds = DTR(memory_budget=5*1024**3)
# your training code
"""
def __init__(self, memory_budget=0, tensor_lowerbound=1048576):
r"""
:param memory_budget: int. The threshold of memory usage. When memory
usage exceeds this value, auto evict will be triggered.
:param tensor_lowerbound: int. The minimum memory limit of the tensor
that can be evicted. Default: 1MB.
"""
if memory_budget > 0:
set_option("enable_auto_drop", 1)
set_option("enable_drop", 1)
set_option("buffer_length", 0)
set_option("memory_budget", memory_budget)
set_option("tensor_lowerbound", tensor_lowerbound)
set_option("record_computing_path", 1)
_set_defrag(True)
......@@ -901,7 +901,7 @@ void init_tensor(py::module m) {
}
m.def("set_option",
[](std::string name, int value){ interpreter_for_py->set_option(name, value); });
[](std::string name, size_t value){ interpreter_for_py->set_option(name, value); });
m.def("get_option",
[](std::string name){ return interpreter_for_py->get_option(name); });
m.def("_set_swap_flag",
......
......@@ -15,7 +15,12 @@ import megengine as mge
import megengine.autodiff as ad
import megengine.functional as F
from megengine import Tensor
from megengine.core._imperative_rt.core2 import _set_drop_flag, _set_swap_flag
from megengine.core._imperative_rt.core2 import (
_set_drop_flag,
_set_swap_flag,
get_option,
set_option,
)
from megengine.module import Linear, Module
from megengine.optimizer import SGD
......@@ -79,7 +84,8 @@ class XORNet(Module):
def test_training_converge_with_swap_and_drop():
_set_swap_flag(True)
_set_drop_flag(True)
old_buffer_length = get_option("buffer_length")
set_option("buffer_length", 0)
net = XORNet()
opt = SGD(net.parameters(), lr=0.01, momentum=0.9, weight_decay=5e-4)
gm = ad.GradManager().attach(net.parameters())
......@@ -122,3 +128,4 @@ def test_training_converge_with_swap_and_drop():
_set_swap_flag(False)
_set_drop_flag(False)
set_option("buffer_length", old_buffer_length)
......@@ -128,7 +128,7 @@ struct Drop {
struct SetOption {
std::string key;
int value;
size_t value;
template <typename TFunctor>
void get_props(TFunctor&& functor) const {
......
......@@ -13,12 +13,11 @@
#include "megbrain/common.h"
#include "megbrain/imperative/opr_utility.h"
#include "megbrain/imperative/ops/backward_graph.h"
#include "megbrain/imperative/ops/autogen.h"
#include "megbrain/imperative/ops/backward_graph.h"
#include "megbrain/imperative/ops/opr_attr.h"
#include "megbrain/imperative/utils/to_string.h"
#include "../op_trait.h"
using namespace mgb;
using namespace imperative;
using namespace interpreter;
......@@ -61,8 +60,6 @@ Handle ChannelImpl::put(const DeviceTensorND& data) {
void ChannelImpl::del(Handle handle) {
mgb_assert(m_valid_handle.count(handle), "invalid handle: %p", handle);
auto* info = reinterpret_cast<TensorInfo*>(handle);
detach_users(info);
info->detach_producer();
m_valid_handle.erase(handle);
m_buffer.enqueue(Del{info});
}
......@@ -73,7 +70,6 @@ void ChannelImpl::swap_in(Handle handle) {
"invalid handle: %p", handle);
auto* info = reinterpret_cast<TensorInfo*>(handle);
m_buffer.enqueue(SwapIn{info});
info->evict_type = NONE;
}
}
......@@ -83,7 +79,6 @@ void ChannelImpl::swap_out(Handle handle) {
"invalid handle: %p", handle);
auto* info = reinterpret_cast<TensorInfo*>(handle);
m_buffer.enqueue(SwapOut{info});
info->evict_type = SWAP;
}
}
......@@ -92,11 +87,6 @@ void ChannelImpl::drop(Handle handle) {
mgb_assert(m_valid_handle.find(handle) != m_valid_handle.end(),
"invalid handle: %p", handle);
auto* info = reinterpret_cast<TensorInfo*>(handle);
if (!info->producer) {
mgb_log_warn("the input that produced tensor %p has been deleted, this drop operation will be ignored", info);
return;
}
info->evict_type = DROP;
m_buffer.enqueue(Drop{info});
}
}
......@@ -167,10 +157,6 @@ void ChannelImpl::dispatch_default_cpu(
outputs->push_back(info);
}
if (m_channel_state.options.enable_drop) {
TensorInfo::ComputePath::make(op, input_infos, output_infos);
}
event_data.outputs = tinfo_to_tid(output_infos);
if (m_channel_state.profiler->is_profiling()) {
m_channel_state.profiler->record_host<HostOpFinishEvent>(event_data);
......@@ -199,9 +185,6 @@ void ChannelImpl::dispatch_kernel(
cmd.outputs.push_back(info);
outputs->push_back(info);
}
if (m_channel_state.options.enable_drop) {
TensorInfo::ComputePath::make(cmd.op, cmd.inputs, cmd.outputs);
}
m_buffer.enqueue(std::move(cmd));
if (!validated && m_channel_state.options.async_level == 1) {
sync();
......@@ -233,7 +216,6 @@ SmallVector<Handle> ChannelImpl::apply_op(
mgb_assert(!info->invalid, "Invalid tensor, unable to apply_op!");
input_infos.push_back(info);
input_descs.push_back(info->desc);
regenerate(info);
}
}
......@@ -269,7 +251,6 @@ HostTensorND ChannelImpl::get_value(Handle handle) {
};
if (!value_fetched()) {
m_waitee = info;
regenerate(info);
m_buffer.enqueue(GetValue{info});
if (m_channel_state.profiler->is_profiling()) {
m_channel_state.profiler->record_host<TensorWaitPropEvent>(info->id, TensorInfo::HostValue);
......@@ -345,7 +326,6 @@ DeviceTensorND ChannelImpl::get_dev_tensor(Handle handle) {
std::unique_lock<decltype(m_mutex)> lock(m_mutex);
mgb_assert(!m_waitee);
m_waitee = info;
regenerate(info);
m_buffer.flush();
if (m_channel_state.profiler->is_profiling()) {
m_channel_state.profiler->record_host<TensorWaitPropEvent>(info->id, TensorInfo::DevValue);
......@@ -379,11 +359,11 @@ void ChannelImpl::close() {
sync();
}
int ChannelImpl::get_option(std::string name) {
size_t ChannelImpl::get_option(std::string name) {
return m_channel_state.options.get_option(name);
}
void ChannelImpl::set_option(std::string name, int value) {
void ChannelImpl::set_option(std::string name, size_t value) {
m_channel_state.options.set_option(name, value);
m_buffer.enqueue(SetOption{name, value});
}
......@@ -399,11 +379,64 @@ TensorInfo* ChannelImpl::alloc() {
return info;
}
void ChannelImpl::do_drop(TensorInfo* ptr, bool user=false) {
if (!ptr->producer) {
if (user) {
mgb_log_warn("the input that produced tensor %p has been deleted, this drop operation will be ignored", ptr);
}
return;
}
if (ptr->evict_type != EvictType::NONE) {
return;
}
ptr->evict_type = EvictType::DROP;
release_tensor(ptr);
}
void ChannelImpl::free(TensorInfo* ptr) {
if (m_worker_state.options.enable_auto_drop) {
// Evicting a tensor, rather than freeing it, can avoid pinning
// potentially exploding amounts of memory and allow us to save
// more memory.
ptr->allow_delete = true;
if (!ptr->ref_cnt) {
recursive_free(ptr);
} else {
do_drop(ptr);
}
} else {
real_free(ptr);
}
}
void ChannelImpl::recursive_free(TensorInfo* ptr) {
SmallVector<TensorInfo*> inps(0);
if (ptr->producer) {
for (auto i : ptr->producer->inputs) {
if (i && --i->ref_cnt == 0) {
inps.push_back(i);
}
}
}
real_free(ptr);
for (auto i : inps) {
if (i->allow_delete) {
recursive_free(i);
}
}
}
void ChannelImpl::real_free(TensorInfo* ptr) {
MGB_LOCK_GUARD(m_mutex);
if (m_channel_state.profiler->is_profiling()) {
m_channel_state.profiler->record_host<TensorEraseEvent>(ptr->id);
}
if (ptr->size_exceeds_thd(m_worker_state.options.tensor_lowerbound)) {
m_dtr.erase_candidate(ptr);
}
detach_users(ptr);
ptr->detach_producer();
m_pool.free(ptr);
}
......@@ -415,17 +448,24 @@ ChannelImpl::~ChannelImpl() {
close();
}
void ChannelImpl::produce_tensor(TensorInfo* dest, TensorPtr ptr) {
MGB_LOCK_GUARD(m_mutex);
if (m_worker_state.profiler->is_profiling()) {
void ChannelImpl::produce_tensor(TensorInfo* dest, TensorPtr ptr, bool notice=true) {
auto lock = notice ? std::unique_lock<std::mutex>(m_mutex)
: std::unique_lock<std::mutex>();
m_dtr.update_used_time(dest);
if (notice && m_worker_state.profiler->is_profiling()) {
m_worker_state.profiler->record_host<TensorProduceEvent>(dest->id, ptr->layout(), ptr->comp_node());
}
dest->value_fetched = ptr->value_fetched();
// update tensor desc for static infer
dest->desc.layout = ptr->layout();
dest->desc.comp_node = ptr->comp_node();
dest->memory = ptr->blob()->size();
dest->ptr = std::move(ptr);
if (m_waitee == dest) {
dest->evict_type = EvictType::NONE;
if (notice && dest->size_exceeds_thd(m_worker_state.options.tensor_lowerbound)) {
m_dtr.insert_candidate(dest);
}
if (notice && m_waitee == dest) {
m_cv.notify_all();
}
}
......@@ -436,37 +476,86 @@ void ChannelImpl::release_tensor(TensorInfo* dest) {
}
void ChannelImpl::regenerate(TensorInfo* dest) {
if (dest->evict_type == DROP) {
if (dest->evict_type == EvictType::DROP) {
recompute(dest->producer);
} else if (dest->evict_type == SWAP) {
swap_in(dest);
} else if (dest->evict_type == EvictType::SWAP) {
produce_tensor(dest, Tensor::make(dest->h_value));
}
mgb_assert(dest->evict_type == NONE);
}
void ChannelImpl::recompute(TensorInfo::ComputePath* path) {
SmallVector<TensorInfo*> workspaces(path->outputs.size(), nullptr);
for (auto&& input: path->inputs) {
regenerate(input);
SmallVector<TensorPtr> inputs;
inputs.reserve(path->inputs.size());
m_dtr.pin(path->inputs);
for (auto i : path->inputs) {
if (!i->ptr) {
regenerate(i);
}
inputs.push_back(i->ptr);
m_dtr.update_used_time(i);
}
if (m_worker_state.options.enable_auto_drop && m_worker_state.options.memory_budget > 0) {
auto_evict();
}
auto outputs = OpDef::apply_on_physical_tensor(*path->op, inputs);
m_dtr.estimate_timestamp += path->compute_time / 1e8;
m_dtr.unpin(path->inputs);
for (size_t i = 0;i < outputs.size();i ++) {
auto&& o = path->outputs[i];
if (o) {
o->recompute_times ++;
if (!o->ptr) {
produce_tensor(o, std::move(outputs[i]), false);
if (m_worker_state.options.enable_auto_drop) {
m_dtr.update_dsu_after_recompute(o);
}
}
}
}
for (auto&& output: path->outputs) {
if(output == nullptr) {
continue;
}
void ChannelImpl::auto_evict() {
if (!m_dtr.comp_node.valid()) {
return;
}
size_t current_memory = m_dtr.comp_node.get_used_memory();
while (current_memory > m_worker_state.options.memory_budget) {
auto best = m_dtr.find_best_tensor();
if (!best) {
if (!m_dtr.warn_printed) {
m_dtr.warn_printed = true;
mgb_log_warn("No tensors on %s can be evicted automatically "
"when memory usage is %.0lfMB. Maybe memory "
"budget is too small.",
m_dtr.comp_node.to_string().c_str(),
current_memory / 1024.0 / 1024.0);
}
break;
}
if (best->ptr.unique() && best->ptr->blob().unique()) {
current_memory -= best->memory;
}
do_drop(best);
if (best->evict_type == EvictType::DROP) {
m_dtr.update_dsu_after_evict(best);
}
output->evict_type = NONE;
}
m_buffer.enqueue(ApplyOp{path->op, path->inputs, path->outputs});
}
void ChannelImpl::detach_users(TensorInfo* dest) {
SmallVector<TensorInfo::ComputePath*> users = dest->users;
for (auto* user: users) {
for (auto* output: user->outputs) {
SmallVector<TensorInfo*> outputs = user->outputs;
SmallVector<TensorInfo*> inputs = user->inputs;
for (auto* output: outputs) {
if (output == nullptr) {
continue;
}
regenerate(output);
output->detach_producer();
for (auto* input: inputs) {
input->ref_cnt --;
}
}
}
mgb_assert(dest->users.size() == 0);
......@@ -524,6 +613,15 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) {
uint64_t apply_id = ++m_last_id;
SmallVector<TensorPtr> tensor_inputs;
SmallVector<CompNode> devices;
if (m_worker_state.options.enable_auto_drop) {
m_dtr.pin(cmd.inputs);
}
for (auto i : cmd.inputs) {
if (!i->ptr && i->evict_type != EvictType::NONE) {
regenerate(i);
}
m_dtr.update_used_time(i);
}
tensor_inputs.reserve(cmd.inputs.size());
// refcnt == 1, owners: [TensorInfo::ptr]
for (auto i : cmd.inputs) {
......@@ -569,6 +667,9 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) {
m_worker_state.profiler->record_device<DeviceOpExecuteEvent>(device, event_data);
}
}
if (m_worker_state.options.enable_auto_drop && m_worker_state.options.memory_budget > 0) {
auto_evict();
}
// Apply op
// Here std::move is REQUIRED for removing duplicated references.
auto tensor_outputs = OpDef::apply_on_physical_tensor(
......@@ -581,16 +682,78 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) {
}
}
// End profiling operator
double estimate_compute_time = 0;
if (m_worker_state.options.enable_auto_drop) {
for (auto i : cmd.inputs) {
estimate_compute_time += i->memory;
}
for (auto i : tensor_outputs) {
estimate_compute_time += i->blob()->size();
}
m_dtr.estimate_timestamp += estimate_compute_time / 1e8;
for (auto i : cmd.outputs) {
i->compute_time = estimate_compute_time;
m_dtr.update_used_time(i);
}
if (cmd.outputs[0]->producer) {
cmd.outputs[0]->producer->compute_time = estimate_compute_time;
}
m_dtr.unpin(cmd.inputs);
}
mgb_assert(tensor_outputs.size() == cmd.outputs.size());
for (size_t i = 0; i < tensor_outputs.size(); ++i) {
if (cmd.outputs[i] == nullptr) {
continue;
}
produce_tensor(cmd.outputs[i], std::move(tensor_outputs[i]));
if (m_worker_state.options.enable_auto_drop) {
cmd.outputs[i]->dsu_ptr = std::make_shared<DsuNode>(estimate_compute_time);
}
}
if (m_worker_state.options.enable_drop == 1
&& m_worker_state.options.record_computing_path == 1){
bool is_inplace = false;
bool cross_cn = false;
for (auto input : cmd.inputs) {
for (auto output : cmd.outputs) {
if (input->ptr->blob()->storage() == output->ptr->blob()->storage()) {
is_inplace = true;
break;
}
}
}
for (auto input : cmd.inputs) {
if (input->ptr->comp_node() != m_dtr.comp_node) {
cross_cn = true;
break;
}
}
for (auto output : cmd.outputs) {
if (output->ptr->comp_node() != m_dtr.comp_node) {
cross_cn = true;
break;
}
}
if (!is_inplace && !cross_cn) {
TensorInfo::ComputePath::make(cmd.op, cmd.inputs, cmd.outputs);
size_t detach_cnt = 0;
for (auto output : cmd.outputs) {
if (!output->size_exceeds_thd(m_worker_state.options.tensor_lowerbound)) {
output->detach_producer();
detach_cnt ++;
}
}
for (auto input : cmd.inputs) {
input->ref_cnt -= detach_cnt;
}
}
}
} else if constexpr (std::is_same_v<T, Del>) {
free(cmd.dest);
} else if constexpr (std::is_same_v<T, GetValue>) {
if (!cmd.dest->ptr && cmd.dest->evict_type != EvictType::NONE) {
regenerate(cmd.dest);
}
mgb_assert(cmd.dest->ptr, "Invalid tensor ptr!");
cmd.dest->ptr->fetch_value();
MGB_LOCK_GUARD(m_mutex);
......@@ -602,9 +765,12 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) {
produce_tensor(cmd.dest, Tensor::make(cmd.dest->h_value));
} else if constexpr (std::is_same_v<T, SwapOut>) {
cmd.dest->h_value = cmd.dest->ptr->get_value();
release_tensor(cmd.dest);
if (cmd.dest->evict_type == EvictType::NONE) {
release_tensor(cmd.dest);
cmd.dest->evict_type = EvictType::SWAP;
}
} else if constexpr (std::is_same_v<T, Drop>) {
release_tensor(cmd.dest);
do_drop(cmd.dest, true);
} else if constexpr (std::is_same_v<T, SetOption>) {
m_worker_state.options.set_option(cmd.key, cmd.value);
} else if constexpr (std::is_same_v<T, StartProfile>) {
......@@ -833,3 +999,111 @@ void ChannelImpl::assert_in_channel() {
void ChannelImpl::assert_in_worker() {
mgb_assert(m_worker_state.tid == std::this_thread::get_id());
}
void ChannelImpl::DynamicSublinear::pin(const SmallVector<TensorInfo*>& vec) {
for (auto i : vec) {
i->pin();
}
}
void ChannelImpl::DynamicSublinear::unpin(const SmallVector<TensorInfo*>& vec) {
for (auto i : vec) {
i->unpin();
}
}
void ChannelImpl::DynamicSublinear::update_dsu_after_recompute(TensorInfo* ptr) {
auto&& dsu_fa = find_father(ptr->dsu_ptr);
dsu_fa->t -= ptr->compute_time;
ptr->dsu_ptr->parent.reset();
ptr->dsu_ptr->t = ptr->compute_time;
}
void ChannelImpl::DynamicSublinear::update_dsu_after_evict(TensorInfo* ptr) {
for (auto i : ptr->producer->inputs) {
if (i->evict_type == EvictType::DROP) {
merge(i->dsu_ptr, ptr->dsu_ptr);
}
}
for (auto i : ptr->producer->outputs) {
if (i && i->evict_type == EvictType::DROP) {
merge(ptr->dsu_ptr, i->dsu_ptr);
}
}
}
double ChannelImpl::DynamicSublinear::estimate_neighbor_cost(TensorInfo* ptr) {
double cost = 0;
for (auto i : ptr->producer->inputs) {
if (i->evict_type == EvictType::DROP) {
double t = find_father(i->dsu_ptr)->t;
if (t < i->compute_time) {
t = i->compute_time;
}
cost += t;
}
}
for (auto i : ptr->producer->outputs) {
if (i && i->evict_type == EvictType::DROP) {
double t = find_father(i->dsu_ptr)->t;
if (t < i->compute_time) {
t = i->compute_time;
}
cost += t;
}
}
return cost;
}
TensorInfo* ChannelImpl::DynamicSublinear::find_best_tensor() {
double min_msps = -1;
TensorInfo* best = nullptr;
for (auto i : candidates) {
if (i->producer && i->ptr && !i->pinned && i->evict_type == EvictType::NONE) {
double neighbor_cost = estimate_neighbor_cost(i);
size_t begin_ptr = reinterpret_cast<size_t>(i->ptr->blob()->storage().get());
auto side_info = i->ptr->comp_node().get_free_left_and_right(begin_ptr, begin_ptr + i->ptr->blob()->size());
double free_mem = side_info.first + side_info.second;
double msps = i->eval_func(neighbor_cost, free_mem, estimate_timestamp, 1.0, 1.0, 1.0, 1.0001);
if (min_msps < 0 || msps < min_msps) {
min_msps = msps;
best = i;
}
}
}
return best;
}
void ChannelImpl::DynamicSublinear::merge(std::shared_ptr<DsuNode> &x, std::shared_ptr<DsuNode> &y) {
auto&& f_x = find_father(x);
auto&& f_y = find_father(y);
if (f_x.get() == f_y.get()) {
return;
}
f_y->t += f_x->t;
f_x->parent = f_y;
}
std::shared_ptr<DsuNode> ChannelImpl::DynamicSublinear::find_father(std::shared_ptr<DsuNode>& x) {
if (x->is_root()) {
return x;
} else {
auto&& fa = find_father(x->parent);
return x->parent = fa;
}
}
void ChannelImpl::DynamicSublinear::insert_candidate(TensorInfo* ptr) {
candidates.insert(ptr);
if (!comp_node.valid()) {
comp_node = ptr->ptr->comp_node();
}
}
void ChannelImpl::DynamicSublinear::erase_candidate(TensorInfo* ptr) {
candidates.erase(ptr);
}
void ChannelImpl::DynamicSublinear::update_used_time(TensorInfo* ptr) {
ptr->last_used_time = estimate_timestamp;
}
......@@ -63,8 +63,8 @@ struct ChannelImpl : Interpreter::Channel {
void sync() override;
void close() override;
int get_option(std::string name) override;
void set_option(std::string name, int value) override;
size_t get_option(std::string name) override;
void set_option(std::string name, size_t value) override;
void start_profile(std::unordered_map<std::string, int> option) override;
void stop_profile(std::string basename, std::string format) override;
......@@ -74,18 +74,22 @@ struct ChannelImpl : Interpreter::Channel {
private:
TensorInfo* alloc();
void free(TensorInfo*);
void real_free(TensorInfo*);
void recursive_free(TensorInfo*);
void do_drop(TensorInfo*, bool);
void detach_users(TensorInfo*);
void process_one_task(IdentifiedCommand&);
void check_worker_exc_unsafe();
void produce_tensor(TensorInfo* dest, TensorPtr ptr);
void produce_tensor(TensorInfo* dest, TensorPtr ptr, bool notice);
void release_tensor(TensorInfo* dest);
void regenerate(TensorInfo* dest);
void recompute(TensorInfo::ComputePath* path);
void dispatch_default_cpu(
std::shared_ptr<OpDef> op,
......@@ -180,7 +184,6 @@ private:
//! level 1: user side errors are sync;
//! level 0: both sync.
int m_async_level = 2;
int m_max_recompute_time = 1;
struct State {
std::thread::id tid;
......@@ -201,6 +204,112 @@ private:
ChannelState m_channel_state;
WorkerState m_worker_state;
/*!
* \brief A framework of dynamic sublienar memory optimization
*
* Note: The main idea is that during the training process, if the memory
* usage exceeds the threshold, select some tensors to evict until the
* memory usage is below the threshold.
*/
struct DynamicSublinear {
/*!
* \brief find an available tensor with the largest evaluation function
*
* Note: An available tensor must satisfy: (1) has computing path,
* (2) is in memory, (3) is not pinned. Evaluation function refers to:
* @see: TensorInfo::eval_func.
*
* \return the pointer of the best tensor; nullptr is returned if no
* available tensor is found
*/
TensorInfo* find_best_tensor();
/*!
* \brief estimate the cost of recomputing tensor ptr
*
* Note: We define the cost as the sum of the costs of each evicted
* components where all the neighbors of ptr are located.
*/
double estimate_neighbor_cost(TensorInfo* ptr);
/*!
* \brief update the last used time of the tensor ptr
*/
void update_used_time(TensorInfo* ptr);
/*!
* \brief merge the two specified sets (the set in which the element x
* is located, and the set in which the element y is located)
*/
void merge(std::shared_ptr<DsuNode> &x, std::shared_ptr<DsuNode> &y);
/*!
* \brief return the representative of the set that contains the
* element x
*/
std::shared_ptr<DsuNode> find_father(std::shared_ptr<DsuNode> &x);
/*!
* \brief update DSU after recomputing tensor ptr
*
* Delete ptr from the set where ptr is located. Since DSU does not
* support this operation, instead, we reset the DSU father of ptr, and
* subtract the recomputation cost of ptr from the cost of the original
* set.
*/
void update_dsu_after_recompute(TensorInfo* ptr);
/*!
* \brief update DSU after evicting tensor ptr
*
* Check the neighbors of x, that is, the input and output tensors, and
* if they are evicted, merge their respective sets.
*/
void update_dsu_after_evict(TensorInfo* ptr);
/*!
* \brief pin the tensors in vec
*/
void pin(const SmallVector<TensorInfo*>& vec);
/*!
* \brief unpin the tensors in vec
*/
void unpin(const SmallVector<TensorInfo*>& vec);
/*!
* \brief add the tensor to the candidate set
*
* If the size of the tensor does not exceed the minimum threshold,
* it will do nothing.
*/
void insert_candidate(TensorInfo* ptr);
/*!
* \brief erase the tensor from the candidate set
*
* If the size of the tensor does not exceed the minimum threshold,
* it will do nothing.
*/
void erase_candidate(TensorInfo* ptr);
//! estimate the current time, in order to reduce the overhead of timer
double estimate_timestamp = 0;
//! the comp node where dynamic sublinear memory optimization works
CompNode comp_node;
//! store all tensors that may be evicted
std::unordered_set<TensorInfo*> candidates;
//! whether the warning message has been printed
bool warn_printed = false;
} m_dtr;
//! automatically evict an optimal tensor
void auto_evict();
};
} // namespace mgb::imperative::interpreter::intl
......@@ -20,10 +20,10 @@ namespace mgb::imperative::interpreter::intl {
struct OptionManager {
private:
std::unordered_map<std::string, int*> m_option_map = {};
std::unordered_map<std::string, size_t*> m_option_map = {};
public:
#define DEF_OPTION(name, env_key, default_value, desc) \
int name = (m_option_map[#name]=&name, get_option_from_env(env_key, default_value));
size_t name = (m_option_map[#name]=&name, get_option_from_env(env_key, default_value));
DEF_OPTION(async_level, "MEGENGINE_INTERP_ASYNC_LEVEL", 2,
"config whether raise error exactly when invoking op.\n"
......@@ -39,20 +39,26 @@ public:
"set command buffer length.");
DEF_OPTION(enable_host_compute, "MEGENGINE_HOST_COMPUTE", 1,
"enable host compute, thus computation may be done in host event if it's device is gpu.");
DEF_OPTION(enable_auto_drop, "MEGENGINE_AUTO_DROP", 0, "");
DEF_OPTION(memory_budget, "MEGENGINE_MEMORY_BUDGET", 0,
"auto drop will start whenever gpu memory usage exceeds this value.");
DEF_OPTION(tensor_lowerbound, "MEGENGINE_TENSOR_LOWERBOUND", 1048576,
"the minimum memory value of a tensor added to the candidate set");
DEF_OPTION(record_computing_path, "MEGENGINE_RECORD_COMPUTING_PATH", 0, "");
#undef DEF_OPTION
void set_option(const std::string& name, int value) {
void set_option(const std::string& name, size_t value) {
*m_option_map[name] = value;
}
int get_option(const std::string& name) const {
size_t get_option(const std::string& name) const {
return *m_option_map.at(name);
}
static int get_option_from_env(const std::string& name, int default_value) {
static size_t get_option_from_env(const std::string& name, size_t default_value) {
if (const char* env_val = MGB_GETENV(name.c_str())) {
default_value = std::atoi(env_val);
sscanf(env_val, "%zu", &default_value);
}
return default_value;
}
......
......@@ -25,6 +25,24 @@ enum EvictType {
DROP = 2,
};
/*!
* \brief an identifier to specify a component of evicted tensors
*
* Each component tracks the sum of the compute costs of its elements, with the
* union of two components having the sum of each constituent cost.
*/
struct DsuNode {
DsuNode(double _t): t(_t) {}
std::shared_ptr<DsuNode> parent;
bool is_root() {
return !bool(parent);
}
double t;
};
struct TensorInfo;
using TensorInfoPtr = std::shared_ptr<TensorInfo>;
......@@ -37,6 +55,10 @@ struct TensorInfo {
TensorPtr ptr;
LogicalTensorDesc desc;
double compute_time;
size_t memory;
double last_used_time;
// FIXME: broken by drop
bool value_fetched = false;
bool invalid = false;
......@@ -49,12 +71,15 @@ struct TensorInfo {
// reserved for auto drop
size_t pinned = 0;
size_t recompute_times = 0;
size_t ref_cnt = 0;
std::shared_ptr<DsuNode> dsu_ptr;
struct ComputePath {
std::shared_ptr<OpDef> op;
SmallVector<TensorInfo*> inputs;
SmallVector<TensorInfo*> unique_inputs;
SmallVector<TensorInfo*> outputs;
double compute_time = 0;
size_t ref_cnt() {
return outputs.size() - std::count(outputs.begin(), outputs.end(), nullptr);
......@@ -78,9 +103,19 @@ struct TensorInfo {
for (auto output: outputs) {
output->producer = path;
}
// update ref_cnt
for (auto input: inputs) {
input->ref_cnt += outputs.size();
}
return path;
}
}* producer = nullptr;
double eval_func(double cost, double free_mem, double cur_time,
double param_cost, double param_mem, double param_time, double param_recompute_times) {
return pow(cost + 1e-3, param_cost) * pow(param_recompute_times, (double)recompute_times)
/ (pow((memory + free_mem) / 1024.0 / 1024.0, param_mem) * pow((double)(cur_time - last_used_time + 1e-3), param_time));
}
void pin() {
++pinned;
......@@ -106,6 +141,10 @@ struct TensorInfo {
producer = nullptr;
}
bool size_exceeds_thd(size_t thd) {
return memory > thd;
}
SmallVector<ComputePath*> users;
};
}
......
......@@ -44,8 +44,8 @@ struct Interpreter {
virtual void sync() = 0;
virtual void close() = 0;
virtual int get_option(std::string name) = 0;
virtual void set_option(std::string name, int value) = 0;
virtual size_t get_option(std::string name) = 0;
virtual void set_option(std::string name, size_t value) = 0;
virtual void start_profile(std::unordered_map<std::string, int> option) = 0;
virtual void stop_profile(std::string basename, std::string format) = 0;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册