diff --git a/imperative/python/megengine/autodiff/grad_manager.py b/imperative/python/megengine/autodiff/grad_manager.py index 70cbc5b7053eccd487f37f4f797360137223158d..3d569795941273d25910c7c64e4a61efa20e93ad 100644 --- a/imperative/python/megengine/autodiff/grad_manager.py +++ b/imperative/python/megengine/autodiff/grad_manager.py @@ -3,7 +3,7 @@ from collections import defaultdict from contextlib import contextmanager from typing import Callable -from ..core._imperative_rt.core2 import pop_scope, push_scope +from ..core._imperative_rt.core2 import pop_scope, push_scope, set_option from ..core.autodiff.grad import Grad from ..logger import get_logger from ..tensor import Tensor @@ -241,6 +241,7 @@ class GradManager: :param dy: tensor or list of tensors. Defaults to 1 if y is scalar """ push_scope("backward") + set_option("record_computing_path", 0) from ..functional import ones_like global backwarding_grad_manager @@ -284,6 +285,7 @@ class GradManager: finally: self.release() backwarding_grad_manager = cache + set_option("record_computing_path", 1) pop_scope("backward") def record(self): diff --git a/imperative/python/megengine/optimizer/optimizer.py b/imperative/python/megengine/optimizer/optimizer.py index cea3e49dcbfea47681955855dc3adabca5cfa1de..8b2d485869fb2df23c62031db2425ae10aa96cfd 100644 --- a/imperative/python/megengine/optimizer/optimizer.py +++ b/imperative/python/megengine/optimizer/optimizer.py @@ -15,7 +15,7 @@ from typing import Union import numpy as np -from ..core._imperative_rt.core2 import pop_scope, push_scope +from ..core._imperative_rt.core2 import pop_scope, push_scope, set_option from ..core.tensor.utils import set_convert_inputs from ..tensor import Parameter, Tensor from ..utils.deprecation import deprecated @@ -148,6 +148,7 @@ class Optimizer(metaclass=ABCMeta): """ # set the globle state `_enable_convert_inputs` to `False` to disable # the `convert_inputs` for param updates + set_option("record_computing_path", 0) backup = set_convert_inputs(False) for group in self.param_groups: if isinstance(group["params"], set): @@ -161,6 +162,7 @@ class Optimizer(metaclass=ABCMeta): pop_scope("step") # restore the globle state `_enable_convert_inputs` set_convert_inputs(backup) + set_option("record_computing_path", 1) return self @deprecated(version="1.0", reason="use clear_grad instead") diff --git a/imperative/python/megengine/utils/dtr.py b/imperative/python/megengine/utils/dtr.py new file mode 100644 index 0000000000000000000000000000000000000000..5b3c4a8d292a31fd395d6f47504ea3161240c9b8 --- /dev/null +++ b/imperative/python/megengine/utils/dtr.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +# MegEngine is Licensed under the Apache License, Version 2.0 (the "License") +# +# Copyright (c) 2014-2021 Megvii Inc. All rights reserved. +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + +from ..core._imperative_rt.core2 import set_option +from ..core._imperative_rt.utils import _set_defrag + + +class DTR: + r""" + DTR implements `Dynamic Tensor Rematerialization `_ in MegEngine. + + It is basically an online algorithm for checkpointing driven by certain eviction policies. + + .. code-block:: + + from megengine.utils.dtr import DTR + + ds = DTR(memory_budget=5*1024**3) + + # your training code + + """ + + def __init__(self, memory_budget=0, tensor_lowerbound=1048576): + r""" + :param memory_budget: int. The threshold of memory usage. When memory + usage exceeds this value, auto evict will be triggered. + :param tensor_lowerbound: int. The minimum memory limit of the tensor + that can be evicted. Default: 1MB. + """ + if memory_budget > 0: + set_option("enable_auto_drop", 1) + set_option("enable_drop", 1) + set_option("buffer_length", 0) + set_option("memory_budget", memory_budget) + set_option("tensor_lowerbound", tensor_lowerbound) + set_option("record_computing_path", 1) + _set_defrag(True) diff --git a/imperative/python/src/tensor.cpp b/imperative/python/src/tensor.cpp index f809685be13e403e9de21ff5ed809ae6d7b7d36b..a38ee999f0c6102bda0771ed6e304b0c22c9ee9a 100644 --- a/imperative/python/src/tensor.cpp +++ b/imperative/python/src/tensor.cpp @@ -901,7 +901,7 @@ void init_tensor(py::module m) { } m.def("set_option", - [](std::string name, int value){ interpreter_for_py->set_option(name, value); }); + [](std::string name, size_t value){ interpreter_for_py->set_option(name, value); }); m.def("get_option", [](std::string name){ return interpreter_for_py->get_option(name); }); m.def("_set_swap_flag", diff --git a/imperative/python/test/integration/test_converge_with_swap_and_drop.py b/imperative/python/test/integration/test_converge_with_swap_and_drop.py index d36863ea710f788826be862e9a6f33873448fc92..709301786d7b3173d5725ca320262fa68e287d9c 100644 --- a/imperative/python/test/integration/test_converge_with_swap_and_drop.py +++ b/imperative/python/test/integration/test_converge_with_swap_and_drop.py @@ -15,7 +15,12 @@ import megengine as mge import megengine.autodiff as ad import megengine.functional as F from megengine import Tensor -from megengine.core._imperative_rt.core2 import _set_drop_flag, _set_swap_flag +from megengine.core._imperative_rt.core2 import ( + _set_drop_flag, + _set_swap_flag, + get_option, + set_option, +) from megengine.module import Linear, Module from megengine.optimizer import SGD @@ -79,7 +84,8 @@ class XORNet(Module): def test_training_converge_with_swap_and_drop(): _set_swap_flag(True) _set_drop_flag(True) - + old_buffer_length = get_option("buffer_length") + set_option("buffer_length", 0) net = XORNet() opt = SGD(net.parameters(), lr=0.01, momentum=0.9, weight_decay=5e-4) gm = ad.GradManager().attach(net.parameters()) @@ -122,3 +128,4 @@ def test_training_converge_with_swap_and_drop(): _set_swap_flag(False) _set_drop_flag(False) + set_option("buffer_length", old_buffer_length) diff --git a/imperative/src/impl/interpreter/commands.h b/imperative/src/impl/interpreter/commands.h index ed38e3e90c1a26b448c21b0247fa4dbb00a5d261..cdbbfc8e173acfa972c44d26edf20d000f6a3c71 100644 --- a/imperative/src/impl/interpreter/commands.h +++ b/imperative/src/impl/interpreter/commands.h @@ -128,7 +128,7 @@ struct Drop { struct SetOption { std::string key; - int value; + size_t value; template void get_props(TFunctor&& functor) const { diff --git a/imperative/src/impl/interpreter/interpreter_impl.cpp b/imperative/src/impl/interpreter/interpreter_impl.cpp index dd8e7d8ea99aff5baf68a109cc338d693d811bc7..50f649f138ea981b386607ee6a3c830a2254bd8c 100644 --- a/imperative/src/impl/interpreter/interpreter_impl.cpp +++ b/imperative/src/impl/interpreter/interpreter_impl.cpp @@ -13,12 +13,11 @@ #include "megbrain/common.h" #include "megbrain/imperative/opr_utility.h" -#include "megbrain/imperative/ops/backward_graph.h" #include "megbrain/imperative/ops/autogen.h" +#include "megbrain/imperative/ops/backward_graph.h" +#include "megbrain/imperative/ops/opr_attr.h" #include "megbrain/imperative/utils/to_string.h" -#include "../op_trait.h" - using namespace mgb; using namespace imperative; using namespace interpreter; @@ -61,8 +60,6 @@ Handle ChannelImpl::put(const DeviceTensorND& data) { void ChannelImpl::del(Handle handle) { mgb_assert(m_valid_handle.count(handle), "invalid handle: %p", handle); auto* info = reinterpret_cast(handle); - detach_users(info); - info->detach_producer(); m_valid_handle.erase(handle); m_buffer.enqueue(Del{info}); } @@ -73,7 +70,6 @@ void ChannelImpl::swap_in(Handle handle) { "invalid handle: %p", handle); auto* info = reinterpret_cast(handle); m_buffer.enqueue(SwapIn{info}); - info->evict_type = NONE; } } @@ -83,7 +79,6 @@ void ChannelImpl::swap_out(Handle handle) { "invalid handle: %p", handle); auto* info = reinterpret_cast(handle); m_buffer.enqueue(SwapOut{info}); - info->evict_type = SWAP; } } @@ -92,11 +87,6 @@ void ChannelImpl::drop(Handle handle) { mgb_assert(m_valid_handle.find(handle) != m_valid_handle.end(), "invalid handle: %p", handle); auto* info = reinterpret_cast(handle); - if (!info->producer) { - mgb_log_warn("the input that produced tensor %p has been deleted, this drop operation will be ignored", info); - return; - } - info->evict_type = DROP; m_buffer.enqueue(Drop{info}); } } @@ -167,10 +157,6 @@ void ChannelImpl::dispatch_default_cpu( outputs->push_back(info); } - if (m_channel_state.options.enable_drop) { - TensorInfo::ComputePath::make(op, input_infos, output_infos); - } - event_data.outputs = tinfo_to_tid(output_infos); if (m_channel_state.profiler->is_profiling()) { m_channel_state.profiler->record_host(event_data); @@ -199,9 +185,6 @@ void ChannelImpl::dispatch_kernel( cmd.outputs.push_back(info); outputs->push_back(info); } - if (m_channel_state.options.enable_drop) { - TensorInfo::ComputePath::make(cmd.op, cmd.inputs, cmd.outputs); - } m_buffer.enqueue(std::move(cmd)); if (!validated && m_channel_state.options.async_level == 1) { sync(); @@ -233,7 +216,6 @@ SmallVector ChannelImpl::apply_op( mgb_assert(!info->invalid, "Invalid tensor, unable to apply_op!"); input_infos.push_back(info); input_descs.push_back(info->desc); - regenerate(info); } } @@ -269,7 +251,6 @@ HostTensorND ChannelImpl::get_value(Handle handle) { }; if (!value_fetched()) { m_waitee = info; - regenerate(info); m_buffer.enqueue(GetValue{info}); if (m_channel_state.profiler->is_profiling()) { m_channel_state.profiler->record_host(info->id, TensorInfo::HostValue); @@ -345,7 +326,6 @@ DeviceTensorND ChannelImpl::get_dev_tensor(Handle handle) { std::unique_lock lock(m_mutex); mgb_assert(!m_waitee); m_waitee = info; - regenerate(info); m_buffer.flush(); if (m_channel_state.profiler->is_profiling()) { m_channel_state.profiler->record_host(info->id, TensorInfo::DevValue); @@ -379,11 +359,11 @@ void ChannelImpl::close() { sync(); } -int ChannelImpl::get_option(std::string name) { +size_t ChannelImpl::get_option(std::string name) { return m_channel_state.options.get_option(name); } -void ChannelImpl::set_option(std::string name, int value) { +void ChannelImpl::set_option(std::string name, size_t value) { m_channel_state.options.set_option(name, value); m_buffer.enqueue(SetOption{name, value}); } @@ -399,11 +379,64 @@ TensorInfo* ChannelImpl::alloc() { return info; } + +void ChannelImpl::do_drop(TensorInfo* ptr, bool user=false) { + if (!ptr->producer) { + if (user) { + mgb_log_warn("the input that produced tensor %p has been deleted, this drop operation will be ignored", ptr); + } + return; + } + if (ptr->evict_type != EvictType::NONE) { + return; + } + ptr->evict_type = EvictType::DROP; + release_tensor(ptr); +} + void ChannelImpl::free(TensorInfo* ptr) { + if (m_worker_state.options.enable_auto_drop) { + // Evicting a tensor, rather than freeing it, can avoid pinning + // potentially exploding amounts of memory and allow us to save + // more memory. + ptr->allow_delete = true; + if (!ptr->ref_cnt) { + recursive_free(ptr); + } else { + do_drop(ptr); + } + } else { + real_free(ptr); + } +} + +void ChannelImpl::recursive_free(TensorInfo* ptr) { + SmallVector inps(0); + if (ptr->producer) { + for (auto i : ptr->producer->inputs) { + if (i && --i->ref_cnt == 0) { + inps.push_back(i); + } + } + } + real_free(ptr); + for (auto i : inps) { + if (i->allow_delete) { + recursive_free(i); + } + } +} + +void ChannelImpl::real_free(TensorInfo* ptr) { MGB_LOCK_GUARD(m_mutex); if (m_channel_state.profiler->is_profiling()) { m_channel_state.profiler->record_host(ptr->id); } + if (ptr->size_exceeds_thd(m_worker_state.options.tensor_lowerbound)) { + m_dtr.erase_candidate(ptr); + } + detach_users(ptr); + ptr->detach_producer(); m_pool.free(ptr); } @@ -415,17 +448,24 @@ ChannelImpl::~ChannelImpl() { close(); } -void ChannelImpl::produce_tensor(TensorInfo* dest, TensorPtr ptr) { - MGB_LOCK_GUARD(m_mutex); - if (m_worker_state.profiler->is_profiling()) { +void ChannelImpl::produce_tensor(TensorInfo* dest, TensorPtr ptr, bool notice=true) { + auto lock = notice ? std::unique_lock(m_mutex) + : std::unique_lock(); + m_dtr.update_used_time(dest); + if (notice && m_worker_state.profiler->is_profiling()) { m_worker_state.profiler->record_host(dest->id, ptr->layout(), ptr->comp_node()); } dest->value_fetched = ptr->value_fetched(); // update tensor desc for static infer dest->desc.layout = ptr->layout(); dest->desc.comp_node = ptr->comp_node(); + dest->memory = ptr->blob()->size(); dest->ptr = std::move(ptr); - if (m_waitee == dest) { + dest->evict_type = EvictType::NONE; + if (notice && dest->size_exceeds_thd(m_worker_state.options.tensor_lowerbound)) { + m_dtr.insert_candidate(dest); + } + if (notice && m_waitee == dest) { m_cv.notify_all(); } } @@ -436,37 +476,86 @@ void ChannelImpl::release_tensor(TensorInfo* dest) { } void ChannelImpl::regenerate(TensorInfo* dest) { - if (dest->evict_type == DROP) { + if (dest->evict_type == EvictType::DROP) { recompute(dest->producer); - } else if (dest->evict_type == SWAP) { - swap_in(dest); + } else if (dest->evict_type == EvictType::SWAP) { + produce_tensor(dest, Tensor::make(dest->h_value)); } - mgb_assert(dest->evict_type == NONE); } void ChannelImpl::recompute(TensorInfo::ComputePath* path) { - SmallVector workspaces(path->outputs.size(), nullptr); - for (auto&& input: path->inputs) { - regenerate(input); + SmallVector inputs; + inputs.reserve(path->inputs.size()); + m_dtr.pin(path->inputs); + for (auto i : path->inputs) { + if (!i->ptr) { + regenerate(i); + } + inputs.push_back(i->ptr); + m_dtr.update_used_time(i); + } + if (m_worker_state.options.enable_auto_drop && m_worker_state.options.memory_budget > 0) { + auto_evict(); + } + auto outputs = OpDef::apply_on_physical_tensor(*path->op, inputs); + m_dtr.estimate_timestamp += path->compute_time / 1e8; + m_dtr.unpin(path->inputs); + for (size_t i = 0;i < outputs.size();i ++) { + auto&& o = path->outputs[i]; + if (o) { + o->recompute_times ++; + if (!o->ptr) { + produce_tensor(o, std::move(outputs[i]), false); + if (m_worker_state.options.enable_auto_drop) { + m_dtr.update_dsu_after_recompute(o); + } + } + } } - for (auto&& output: path->outputs) { - if(output == nullptr) { - continue; +} + +void ChannelImpl::auto_evict() { + if (!m_dtr.comp_node.valid()) { + return; + } + size_t current_memory = m_dtr.comp_node.get_used_memory(); + while (current_memory > m_worker_state.options.memory_budget) { + auto best = m_dtr.find_best_tensor(); + if (!best) { + if (!m_dtr.warn_printed) { + m_dtr.warn_printed = true; + mgb_log_warn("No tensors on %s can be evicted automatically " + "when memory usage is %.0lfMB. Maybe memory " + "budget is too small.", + m_dtr.comp_node.to_string().c_str(), + current_memory / 1024.0 / 1024.0); + } + break; + } + if (best->ptr.unique() && best->ptr->blob().unique()) { + current_memory -= best->memory; + } + do_drop(best); + if (best->evict_type == EvictType::DROP) { + m_dtr.update_dsu_after_evict(best); } - output->evict_type = NONE; } - m_buffer.enqueue(ApplyOp{path->op, path->inputs, path->outputs}); } void ChannelImpl::detach_users(TensorInfo* dest) { SmallVector users = dest->users; for (auto* user: users) { - for (auto* output: user->outputs) { + SmallVector outputs = user->outputs; + SmallVector inputs = user->inputs; + for (auto* output: outputs) { if (output == nullptr) { continue; } regenerate(output); output->detach_producer(); + for (auto* input: inputs) { + input->ref_cnt --; + } } } mgb_assert(dest->users.size() == 0); @@ -524,6 +613,15 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) { uint64_t apply_id = ++m_last_id; SmallVector tensor_inputs; SmallVector devices; + if (m_worker_state.options.enable_auto_drop) { + m_dtr.pin(cmd.inputs); + } + for (auto i : cmd.inputs) { + if (!i->ptr && i->evict_type != EvictType::NONE) { + regenerate(i); + } + m_dtr.update_used_time(i); + } tensor_inputs.reserve(cmd.inputs.size()); // refcnt == 1, owners: [TensorInfo::ptr] for (auto i : cmd.inputs) { @@ -569,6 +667,9 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) { m_worker_state.profiler->record_device(device, event_data); } } + if (m_worker_state.options.enable_auto_drop && m_worker_state.options.memory_budget > 0) { + auto_evict(); + } // Apply op // Here std::move is REQUIRED for removing duplicated references. auto tensor_outputs = OpDef::apply_on_physical_tensor( @@ -581,16 +682,78 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) { } } // End profiling operator + double estimate_compute_time = 0; + if (m_worker_state.options.enable_auto_drop) { + for (auto i : cmd.inputs) { + estimate_compute_time += i->memory; + } + for (auto i : tensor_outputs) { + estimate_compute_time += i->blob()->size(); + } + m_dtr.estimate_timestamp += estimate_compute_time / 1e8; + for (auto i : cmd.outputs) { + i->compute_time = estimate_compute_time; + m_dtr.update_used_time(i); + } + if (cmd.outputs[0]->producer) { + cmd.outputs[0]->producer->compute_time = estimate_compute_time; + } + m_dtr.unpin(cmd.inputs); + } mgb_assert(tensor_outputs.size() == cmd.outputs.size()); for (size_t i = 0; i < tensor_outputs.size(); ++i) { if (cmd.outputs[i] == nullptr) { continue; } produce_tensor(cmd.outputs[i], std::move(tensor_outputs[i])); + if (m_worker_state.options.enable_auto_drop) { + cmd.outputs[i]->dsu_ptr = std::make_shared(estimate_compute_time); + } + } + if (m_worker_state.options.enable_drop == 1 + && m_worker_state.options.record_computing_path == 1){ + bool is_inplace = false; + bool cross_cn = false; + for (auto input : cmd.inputs) { + for (auto output : cmd.outputs) { + if (input->ptr->blob()->storage() == output->ptr->blob()->storage()) { + is_inplace = true; + break; + } + } + } + for (auto input : cmd.inputs) { + if (input->ptr->comp_node() != m_dtr.comp_node) { + cross_cn = true; + break; + } + } + for (auto output : cmd.outputs) { + if (output->ptr->comp_node() != m_dtr.comp_node) { + cross_cn = true; + break; + } + } + if (!is_inplace && !cross_cn) { + TensorInfo::ComputePath::make(cmd.op, cmd.inputs, cmd.outputs); + size_t detach_cnt = 0; + for (auto output : cmd.outputs) { + if (!output->size_exceeds_thd(m_worker_state.options.tensor_lowerbound)) { + output->detach_producer(); + detach_cnt ++; + } + } + for (auto input : cmd.inputs) { + input->ref_cnt -= detach_cnt; + } + } } } else if constexpr (std::is_same_v) { free(cmd.dest); } else if constexpr (std::is_same_v) { + if (!cmd.dest->ptr && cmd.dest->evict_type != EvictType::NONE) { + regenerate(cmd.dest); + } mgb_assert(cmd.dest->ptr, "Invalid tensor ptr!"); cmd.dest->ptr->fetch_value(); MGB_LOCK_GUARD(m_mutex); @@ -602,9 +765,12 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) { produce_tensor(cmd.dest, Tensor::make(cmd.dest->h_value)); } else if constexpr (std::is_same_v) { cmd.dest->h_value = cmd.dest->ptr->get_value(); - release_tensor(cmd.dest); + if (cmd.dest->evict_type == EvictType::NONE) { + release_tensor(cmd.dest); + cmd.dest->evict_type = EvictType::SWAP; + } } else if constexpr (std::is_same_v) { - release_tensor(cmd.dest); + do_drop(cmd.dest, true); } else if constexpr (std::is_same_v) { m_worker_state.options.set_option(cmd.key, cmd.value); } else if constexpr (std::is_same_v) { @@ -833,3 +999,111 @@ void ChannelImpl::assert_in_channel() { void ChannelImpl::assert_in_worker() { mgb_assert(m_worker_state.tid == std::this_thread::get_id()); } + +void ChannelImpl::DynamicSublinear::pin(const SmallVector& vec) { + for (auto i : vec) { + i->pin(); + } +} + +void ChannelImpl::DynamicSublinear::unpin(const SmallVector& vec) { + for (auto i : vec) { + i->unpin(); + } +} + +void ChannelImpl::DynamicSublinear::update_dsu_after_recompute(TensorInfo* ptr) { + auto&& dsu_fa = find_father(ptr->dsu_ptr); + dsu_fa->t -= ptr->compute_time; + ptr->dsu_ptr->parent.reset(); + ptr->dsu_ptr->t = ptr->compute_time; +} + +void ChannelImpl::DynamicSublinear::update_dsu_after_evict(TensorInfo* ptr) { + for (auto i : ptr->producer->inputs) { + if (i->evict_type == EvictType::DROP) { + merge(i->dsu_ptr, ptr->dsu_ptr); + } + } + for (auto i : ptr->producer->outputs) { + if (i && i->evict_type == EvictType::DROP) { + merge(ptr->dsu_ptr, i->dsu_ptr); + } + } +} + +double ChannelImpl::DynamicSublinear::estimate_neighbor_cost(TensorInfo* ptr) { + double cost = 0; + for (auto i : ptr->producer->inputs) { + if (i->evict_type == EvictType::DROP) { + double t = find_father(i->dsu_ptr)->t; + if (t < i->compute_time) { + t = i->compute_time; + } + cost += t; + } + } + for (auto i : ptr->producer->outputs) { + if (i && i->evict_type == EvictType::DROP) { + double t = find_father(i->dsu_ptr)->t; + if (t < i->compute_time) { + t = i->compute_time; + } + cost += t; + } + } + return cost; +} + +TensorInfo* ChannelImpl::DynamicSublinear::find_best_tensor() { + double min_msps = -1; + TensorInfo* best = nullptr; + for (auto i : candidates) { + if (i->producer && i->ptr && !i->pinned && i->evict_type == EvictType::NONE) { + double neighbor_cost = estimate_neighbor_cost(i); + size_t begin_ptr = reinterpret_cast(i->ptr->blob()->storage().get()); + auto side_info = i->ptr->comp_node().get_free_left_and_right(begin_ptr, begin_ptr + i->ptr->blob()->size()); + double free_mem = side_info.first + side_info.second; + double msps = i->eval_func(neighbor_cost, free_mem, estimate_timestamp, 1.0, 1.0, 1.0, 1.0001); + if (min_msps < 0 || msps < min_msps) { + min_msps = msps; + best = i; + } + } + } + return best; +} + +void ChannelImpl::DynamicSublinear::merge(std::shared_ptr &x, std::shared_ptr &y) { + auto&& f_x = find_father(x); + auto&& f_y = find_father(y); + if (f_x.get() == f_y.get()) { + return; + } + f_y->t += f_x->t; + f_x->parent = f_y; +} + +std::shared_ptr ChannelImpl::DynamicSublinear::find_father(std::shared_ptr& x) { + if (x->is_root()) { + return x; + } else { + auto&& fa = find_father(x->parent); + return x->parent = fa; + } +} + +void ChannelImpl::DynamicSublinear::insert_candidate(TensorInfo* ptr) { + candidates.insert(ptr); + if (!comp_node.valid()) { + comp_node = ptr->ptr->comp_node(); + } +} + +void ChannelImpl::DynamicSublinear::erase_candidate(TensorInfo* ptr) { + candidates.erase(ptr); +} + +void ChannelImpl::DynamicSublinear::update_used_time(TensorInfo* ptr) { + ptr->last_used_time = estimate_timestamp; +} diff --git a/imperative/src/impl/interpreter/interpreter_impl.h b/imperative/src/impl/interpreter/interpreter_impl.h index 0757c6635aa7b0f6796cf6e663ed62fba4259ce0..72136fecef1a79425018fb5aafb9f12587e2e141 100644 --- a/imperative/src/impl/interpreter/interpreter_impl.h +++ b/imperative/src/impl/interpreter/interpreter_impl.h @@ -63,8 +63,8 @@ struct ChannelImpl : Interpreter::Channel { void sync() override; void close() override; - int get_option(std::string name) override; - void set_option(std::string name, int value) override; + size_t get_option(std::string name) override; + void set_option(std::string name, size_t value) override; void start_profile(std::unordered_map option) override; void stop_profile(std::string basename, std::string format) override; @@ -74,18 +74,22 @@ struct ChannelImpl : Interpreter::Channel { private: TensorInfo* alloc(); void free(TensorInfo*); + void real_free(TensorInfo*); + void recursive_free(TensorInfo*); + void do_drop(TensorInfo*, bool); void detach_users(TensorInfo*); void process_one_task(IdentifiedCommand&); void check_worker_exc_unsafe(); - void produce_tensor(TensorInfo* dest, TensorPtr ptr); + void produce_tensor(TensorInfo* dest, TensorPtr ptr, bool notice); void release_tensor(TensorInfo* dest); void regenerate(TensorInfo* dest); void recompute(TensorInfo::ComputePath* path); + void dispatch_default_cpu( std::shared_ptr op, @@ -180,7 +184,6 @@ private: //! level 1: user side errors are sync; //! level 0: both sync. int m_async_level = 2; - int m_max_recompute_time = 1; struct State { std::thread::id tid; @@ -201,6 +204,112 @@ private: ChannelState m_channel_state; WorkerState m_worker_state; + + + /*! + * \brief A framework of dynamic sublienar memory optimization + * + * Note: The main idea is that during the training process, if the memory + * usage exceeds the threshold, select some tensors to evict until the + * memory usage is below the threshold. + */ + struct DynamicSublinear { + /*! + * \brief find an available tensor with the largest evaluation function + * + * Note: An available tensor must satisfy: (1) has computing path, + * (2) is in memory, (3) is not pinned. Evaluation function refers to: + * @see: TensorInfo::eval_func. + * + * \return the pointer of the best tensor; nullptr is returned if no + * available tensor is found + */ + TensorInfo* find_best_tensor(); + + /*! + * \brief estimate the cost of recomputing tensor ptr + * + * Note: We define the cost as the sum of the costs of each evicted + * components where all the neighbors of ptr are located. + */ + double estimate_neighbor_cost(TensorInfo* ptr); + + /*! + * \brief update the last used time of the tensor ptr + */ + void update_used_time(TensorInfo* ptr); + + /*! + * \brief merge the two specified sets (the set in which the element x + * is located, and the set in which the element y is located) + */ + void merge(std::shared_ptr &x, std::shared_ptr &y); + + /*! + * \brief return the representative of the set that contains the + * element x + */ + std::shared_ptr find_father(std::shared_ptr &x); + + /*! + * \brief update DSU after recomputing tensor ptr + * + * Delete ptr from the set where ptr is located. Since DSU does not + * support this operation, instead, we reset the DSU father of ptr, and + * subtract the recomputation cost of ptr from the cost of the original + * set. + */ + void update_dsu_after_recompute(TensorInfo* ptr); + + /*! + * \brief update DSU after evicting tensor ptr + * + * Check the neighbors of x, that is, the input and output tensors, and + * if they are evicted, merge their respective sets. + */ + void update_dsu_after_evict(TensorInfo* ptr); + + /*! + * \brief pin the tensors in vec + */ + void pin(const SmallVector& vec); + + /*! + * \brief unpin the tensors in vec + */ + void unpin(const SmallVector& vec); + + /*! + * \brief add the tensor to the candidate set + * + * If the size of the tensor does not exceed the minimum threshold, + * it will do nothing. + */ + void insert_candidate(TensorInfo* ptr); + + /*! + * \brief erase the tensor from the candidate set + * + * If the size of the tensor does not exceed the minimum threshold, + * it will do nothing. + */ + void erase_candidate(TensorInfo* ptr); + + //! estimate the current time, in order to reduce the overhead of timer + double estimate_timestamp = 0; + + //! the comp node where dynamic sublinear memory optimization works + CompNode comp_node; + + //! store all tensors that may be evicted + std::unordered_set candidates; + + //! whether the warning message has been printed + bool warn_printed = false; + } m_dtr; + + //! automatically evict an optimal tensor + void auto_evict(); }; } // namespace mgb::imperative::interpreter::intl diff --git a/imperative/src/impl/interpreter/option_manager.h b/imperative/src/impl/interpreter/option_manager.h index 287a278ccf0d431d400916196610890b8c17a8b0..cae5532c30a650aa049086875cf8b192db3d4045 100644 --- a/imperative/src/impl/interpreter/option_manager.h +++ b/imperative/src/impl/interpreter/option_manager.h @@ -20,10 +20,10 @@ namespace mgb::imperative::interpreter::intl { struct OptionManager { private: - std::unordered_map m_option_map = {}; + std::unordered_map m_option_map = {}; public: #define DEF_OPTION(name, env_key, default_value, desc) \ - int name = (m_option_map[#name]=&name, get_option_from_env(env_key, default_value)); + size_t name = (m_option_map[#name]=&name, get_option_from_env(env_key, default_value)); DEF_OPTION(async_level, "MEGENGINE_INTERP_ASYNC_LEVEL", 2, "config whether raise error exactly when invoking op.\n" @@ -39,20 +39,26 @@ public: "set command buffer length."); DEF_OPTION(enable_host_compute, "MEGENGINE_HOST_COMPUTE", 1, "enable host compute, thus computation may be done in host event if it's device is gpu."); + DEF_OPTION(enable_auto_drop, "MEGENGINE_AUTO_DROP", 0, ""); + DEF_OPTION(memory_budget, "MEGENGINE_MEMORY_BUDGET", 0, + "auto drop will start whenever gpu memory usage exceeds this value."); + DEF_OPTION(tensor_lowerbound, "MEGENGINE_TENSOR_LOWERBOUND", 1048576, + "the minimum memory value of a tensor added to the candidate set"); + DEF_OPTION(record_computing_path, "MEGENGINE_RECORD_COMPUTING_PATH", 0, ""); #undef DEF_OPTION - void set_option(const std::string& name, int value) { + void set_option(const std::string& name, size_t value) { *m_option_map[name] = value; } - int get_option(const std::string& name) const { + size_t get_option(const std::string& name) const { return *m_option_map.at(name); } - static int get_option_from_env(const std::string& name, int default_value) { + static size_t get_option_from_env(const std::string& name, size_t default_value) { if (const char* env_val = MGB_GETENV(name.c_str())) { - default_value = std::atoi(env_val); + sscanf(env_val, "%zu", &default_value); } return default_value; } diff --git a/imperative/src/impl/interpreter/tensor_info.h b/imperative/src/impl/interpreter/tensor_info.h index eeeceb9f5965845cf2ea35137cd568145ebb5327..7acd104d857d5d077bd3d71f69b7fff0b7cee5df 100644 --- a/imperative/src/impl/interpreter/tensor_info.h +++ b/imperative/src/impl/interpreter/tensor_info.h @@ -25,6 +25,24 @@ enum EvictType { DROP = 2, }; +/*! + * \brief an identifier to specify a component of evicted tensors + * + * Each component tracks the sum of the compute costs of its elements, with the + * union of two components having the sum of each constituent cost. + */ +struct DsuNode { + DsuNode(double _t): t(_t) {} + + std::shared_ptr parent; + + bool is_root() { + return !bool(parent); + } + + double t; +}; + struct TensorInfo; using TensorInfoPtr = std::shared_ptr; @@ -37,6 +55,10 @@ struct TensorInfo { TensorPtr ptr; LogicalTensorDesc desc; + double compute_time; + size_t memory; + double last_used_time; + // FIXME: broken by drop bool value_fetched = false; bool invalid = false; @@ -49,12 +71,15 @@ struct TensorInfo { // reserved for auto drop size_t pinned = 0; size_t recompute_times = 0; + size_t ref_cnt = 0; + std::shared_ptr dsu_ptr; struct ComputePath { std::shared_ptr op; SmallVector inputs; SmallVector unique_inputs; SmallVector outputs; + double compute_time = 0; size_t ref_cnt() { return outputs.size() - std::count(outputs.begin(), outputs.end(), nullptr); @@ -78,9 +103,19 @@ struct TensorInfo { for (auto output: outputs) { output->producer = path; } + // update ref_cnt + for (auto input: inputs) { + input->ref_cnt += outputs.size(); + } return path; } }* producer = nullptr; + + double eval_func(double cost, double free_mem, double cur_time, + double param_cost, double param_mem, double param_time, double param_recompute_times) { + return pow(cost + 1e-3, param_cost) * pow(param_recompute_times, (double)recompute_times) + / (pow((memory + free_mem) / 1024.0 / 1024.0, param_mem) * pow((double)(cur_time - last_used_time + 1e-3), param_time)); + } void pin() { ++pinned; @@ -106,6 +141,10 @@ struct TensorInfo { producer = nullptr; } + bool size_exceeds_thd(size_t thd) { + return memory > thd; + } + SmallVector users; }; } diff --git a/imperative/src/include/megbrain/imperative/interpreter.h b/imperative/src/include/megbrain/imperative/interpreter.h index ff500a001df0b4d1f22cd4f8db5c922ba57008f8..d9de1308560515bc4d0163e3cb68cfd044af5236 100644 --- a/imperative/src/include/megbrain/imperative/interpreter.h +++ b/imperative/src/include/megbrain/imperative/interpreter.h @@ -44,8 +44,8 @@ struct Interpreter { virtual void sync() = 0; virtual void close() = 0; - virtual int get_option(std::string name) = 0; - virtual void set_option(std::string name, int value) = 0; + virtual size_t get_option(std::string name) = 0; + virtual void set_option(std::string name, size_t value) = 0; virtual void start_profile(std::unordered_map option) = 0; virtual void stop_profile(std::string basename, std::string format) = 0;