diff --git a/imperative/python/megengine/__init__.py b/imperative/python/megengine/__init__.py index d43f16b37bc858dbddffff27985d29b0fda6d810..95f32e9ad3835f4de61a47f779e9ce328f5c5fc0 100644 --- a/imperative/python/megengine/__init__.py +++ b/imperative/python/megengine/__init__.py @@ -76,6 +76,7 @@ from .core._imperative_rt.core2 import full_sync as _full_sync from .core._imperative_rt.core2 import sync as _sync from .core._imperative_rt.utils import _set_fork_exec_path_for_timed_func from .device import * +from .dtr import * from .logger import enable_debug_log, get_logger, set_log_file, set_log_level from .serialization import load, save from .tensor import Parameter, Tensor, tensor diff --git a/imperative/python/megengine/dtr.py b/imperative/python/megengine/dtr.py new file mode 100644 index 0000000000000000000000000000000000000000..d586b69e0f73a3faab30a15e1eb163f5fb1d8473 --- /dev/null +++ b/imperative/python/megengine/dtr.py @@ -0,0 +1,131 @@ +# -*- coding: utf-8 -*- +# MegEngine is Licensed under the Apache License, Version 2.0 (the "License") +# +# Copyright (c) 2014-2021 Megvii Inc. All rights reserved. +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +import re +from typing import Union + +from mprop import mproperty + +from .core._imperative_rt.core2 import set_option +from .core._imperative_rt.utils import _set_defrag + +_eviction_threshold = 0 +_evictee_minimum_size = 1024 ** 2 + + +def str2bytes(text: str) -> int: + regex = re.compile(r"(\d+(?:\.\d+)?)\s*([kmg]?b)", re.IGNORECASE) + order = ["b", "kb", "mb", "gb"] + result = regex.findall(text) + if len(result) != 1: + raise ValueError( + "Formatting of `value` only supports bytes(B), kilobyte(KB), megabyte(MB) and gigabyte(GB) units" + ) + return int(float(result[0][0]) * 1024 ** order.index(result[0][1].lower())) + + +@mproperty +def eviction_threshold(mod): + r""" + Returns the eviction threshold in bytes. + + .. note:: + + When GPU memory usage exceeds this value, DTR will heuristically select + and evict resident tensors until the amount of used memory falls below + this threshold. + + """ + return mod._eviction_threshold + + +@eviction_threshold.setter +def eviction_threshold(mod, value: Union[int, str]): + r""" + Change the eviction threshold. If `value` is an int, it represents the + number of bytes. If `value` is a string, its formatting supports bytes(B), + kilobyte(KB), megabyte(MB) and gigabyte(GB) units. + + Examples: + + .. code-block:: + + import megengine as mge + mge.dtr.eviction_threshold = 2 * 1024 ** 3 + mge.dtr.eviction_threshold = "2GB" + mge.dtr.eviction_threshold = "2048MB" + + """ + if isinstance(value, str): + mod._eviction_threshold = mod.str2bytes(value) + elif isinstance(value, int): + mod._eviction_threshold = value + else: + raise TypeError("`value` should be a str or an int") + set_option("dtr_eviction_threshold", mod._eviction_threshold) + + +@mproperty +def evictee_minimum_size(mod): + r""" + Returns the memory threshold of tensors in bytes. + + .. note:: + + Only tensors whose size exceeds this threshold will be added to the + candidate set. A tensor that is not added to the candidate set will + never be evicted during its lifetime. + + """ + return mod._evictee_minimum_size + + +@evictee_minimum_size.setter +def evictee_minimum_size(mod, value: Union[int, str]): + r""" + Change the memory threshold of tensors. If `value` is an int, it represents + the number of bytes. If `value` is a string, its formatting supports bytes(B), + kilobyte(KB), megabyte(MB) and gigabyte(GB) units. + + Examples: + + .. code-block:: + + import megengine as mge + mge.dtr.evictee_minimum_size = 2 * 1024 ** 2 + mge.dtr.evictee_minimum_size = "2MB" + mge.dtr.evictee_minimum_size = "2048KB" + + """ + if isinstance(value, str): + mod._evictee_minimum_size = mod.str2bytes(value) + elif isinstance(value, int): + mod._evictee_minimum_size = value + else: + raise TypeError("`value` should be a str or an int") + set_option("dtr_evictee_minimum_size", mod._evictee_minimum_size) + + +def enable(): + r""" + Enable to record computing path of tensors and to perform DTR policy. + """ + _set_defrag(True) + set_option("enable_dtr_auto_drop", 1) + set_option("enable_drop", 1) + set_option("buffer_length", 0) + set_option("record_computing_path", 1) + + +def disable(): + r""" + Stop recording computing path of tensors and performing DTR policy. + """ + set_option("enable_dtr_auto_drop", 0) + set_option("enable_drop", 0) + set_option("record_computing_path", 0) diff --git a/imperative/python/megengine/utils/dtr.py b/imperative/python/megengine/utils/dtr.py deleted file mode 100644 index 5b3c4a8d292a31fd395d6f47504ea3161240c9b8..0000000000000000000000000000000000000000 --- a/imperative/python/megengine/utils/dtr.py +++ /dev/null @@ -1,44 +0,0 @@ -# -*- coding: utf-8 -*- -# MegEngine is Licensed under the Apache License, Version 2.0 (the "License") -# -# Copyright (c) 2014-2021 Megvii Inc. All rights reserved. -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - -from ..core._imperative_rt.core2 import set_option -from ..core._imperative_rt.utils import _set_defrag - - -class DTR: - r""" - DTR implements `Dynamic Tensor Rematerialization `_ in MegEngine. - - It is basically an online algorithm for checkpointing driven by certain eviction policies. - - .. code-block:: - - from megengine.utils.dtr import DTR - - ds = DTR(memory_budget=5*1024**3) - - # your training code - - """ - - def __init__(self, memory_budget=0, tensor_lowerbound=1048576): - r""" - :param memory_budget: int. The threshold of memory usage. When memory - usage exceeds this value, auto evict will be triggered. - :param tensor_lowerbound: int. The minimum memory limit of the tensor - that can be evicted. Default: 1MB. - """ - if memory_budget > 0: - set_option("enable_auto_drop", 1) - set_option("enable_drop", 1) - set_option("buffer_length", 0) - set_option("memory_budget", memory_budget) - set_option("tensor_lowerbound", tensor_lowerbound) - set_option("record_computing_path", 1) - _set_defrag(True) diff --git a/imperative/python/requires.txt b/imperative/python/requires.txt index abcbb2453661e1daa05bdff658775823ca7fc681..4921869aecc0730bd582dd89f17882784a801328 100644 --- a/imperative/python/requires.txt +++ b/imperative/python/requires.txt @@ -6,3 +6,4 @@ tabulate tqdm redispy deprecated +mprop diff --git a/imperative/src/impl/interpreter/interpreter_impl.cpp b/imperative/src/impl/interpreter/interpreter_impl.cpp index 848fbdeee3c2d2417ccaf925107da13c9b3b2dc7..1d8c98fbad700ca00d95cc943d335670f0d79021 100644 --- a/imperative/src/impl/interpreter/interpreter_impl.cpp +++ b/imperative/src/impl/interpreter/interpreter_impl.cpp @@ -422,7 +422,7 @@ void ChannelImpl::do_drop(TensorInfo* ptr, bool user=false) { } void ChannelImpl::free(TensorInfo* ptr) { - if (m_worker_state.options.enable_auto_drop) { + if (m_worker_state.options.enable_dtr_auto_drop) { // Evicting a tensor, rather than freeing it, can avoid pinning // potentially exploding amounts of memory and allow us to save // more memory. @@ -459,7 +459,7 @@ void ChannelImpl::real_free(TensorInfo* ptr) { if (m_channel_state.profiler->is_profiling()) { m_channel_state.profiler->record_host(ptr->id); } - if (ptr->size_exceeds_thd(m_worker_state.options.tensor_lowerbound)) { + if (ptr->size_exceeds_thd(m_worker_state.options.dtr_evictee_minimum_size)) { m_dtr.erase_candidate(ptr); } detach_users(ptr); @@ -487,7 +487,7 @@ void ChannelImpl::produce_tensor(TensorInfo* dest, TensorPtr ptr, bool notice=tr dest->memory = ptr->blob()->size(); dest->ptr = std::move(ptr); dest->evict_type = EvictType::NONE; - if (notice && dest->size_exceeds_thd(m_worker_state.options.tensor_lowerbound)) { + if (notice && dest->size_exceeds_thd(m_worker_state.options.dtr_evictee_minimum_size)) { m_dtr.insert_candidate(dest); } if (notice && m_waitee == dest) { @@ -519,7 +519,7 @@ void ChannelImpl::recompute(TensorInfo::ComputePath* path) { inputs.push_back(i->ptr); m_dtr.update_used_time(i); } - if (m_worker_state.options.enable_auto_drop && m_worker_state.options.memory_budget > 0) { + if (m_worker_state.options.enable_dtr_auto_drop && m_worker_state.options.dtr_eviction_threshold > 0) { auto_evict(); } auto outputs = OpDef::apply_on_physical_tensor(*path->op, inputs); @@ -531,7 +531,7 @@ void ChannelImpl::recompute(TensorInfo::ComputePath* path) { o->recompute_times ++; if (!o->ptr) { produce_tensor(o, std::move(outputs[i]), false); - if (m_worker_state.options.enable_auto_drop) { + if (m_worker_state.options.enable_dtr_auto_drop) { m_dtr.update_dsu_after_recompute(o); } } @@ -544,7 +544,7 @@ void ChannelImpl::auto_evict() { return; } size_t current_memory = m_dtr.comp_node.get_used_memory(); - while (current_memory > m_worker_state.options.memory_budget) { + while (current_memory > m_worker_state.options.dtr_eviction_threshold) { auto best = m_dtr.find_best_tensor(); if (!best) { if (!m_dtr.warn_printed) { @@ -642,7 +642,7 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) { uint64_t apply_id = ++m_last_id; SmallVector tensor_inputs; SmallVector devices; - if (m_worker_state.options.enable_auto_drop) { + if (m_worker_state.options.enable_dtr_auto_drop) { m_dtr.pin(cmd.inputs); } for (auto i : cmd.inputs) { @@ -696,7 +696,7 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) { m_worker_state.profiler->record_device(device, event_data); } } - if (m_worker_state.options.enable_auto_drop && m_worker_state.options.memory_budget > 0) { + if (m_worker_state.options.enable_dtr_auto_drop && m_worker_state.options.dtr_eviction_threshold > 0) { auto_evict(); } // Apply op @@ -712,7 +712,7 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) { } // End profiling operator double estimate_compute_time = 0; - if (m_worker_state.options.enable_auto_drop) { + if (m_worker_state.options.enable_dtr_auto_drop) { for (auto i : cmd.inputs) { estimate_compute_time += i->memory; } @@ -735,7 +735,7 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) { continue; } produce_tensor(cmd.outputs[i], std::move(tensor_outputs[i])); - if (m_worker_state.options.enable_auto_drop) { + if (m_worker_state.options.enable_dtr_auto_drop) { cmd.outputs[i]->dsu_ptr = std::make_shared(estimate_compute_time); } } @@ -774,7 +774,7 @@ void ChannelImpl::process_one_task(IdentifiedCommand& icmd) { TensorInfo::ComputePath::make(cmd.op, cmd.inputs, cmd.outputs); size_t detach_cnt = 0; for (auto output : cmd.outputs) { - if (!output->size_exceeds_thd(m_worker_state.options.tensor_lowerbound)) { + if (!output->size_exceeds_thd(m_worker_state.options.dtr_evictee_minimum_size)) { output->detach_producer(); detach_cnt ++; } diff --git a/imperative/src/impl/interpreter/option_manager.h b/imperative/src/impl/interpreter/option_manager.h index cae5532c30a650aa049086875cf8b192db3d4045..8100c15de793eed40e17f5d190e87f92c5957fcd 100644 --- a/imperative/src/impl/interpreter/option_manager.h +++ b/imperative/src/impl/interpreter/option_manager.h @@ -39,10 +39,10 @@ public: "set command buffer length."); DEF_OPTION(enable_host_compute, "MEGENGINE_HOST_COMPUTE", 1, "enable host compute, thus computation may be done in host event if it's device is gpu."); - DEF_OPTION(enable_auto_drop, "MEGENGINE_AUTO_DROP", 0, ""); - DEF_OPTION(memory_budget, "MEGENGINE_MEMORY_BUDGET", 0, + DEF_OPTION(enable_dtr_auto_drop, "MEGENGINE_DTR_AUTO_DROP", 0, ""); + DEF_OPTION(dtr_eviction_threshold, "MEGENGINE_DTR_EVICTION_THRESHOLD", 0, "auto drop will start whenever gpu memory usage exceeds this value."); - DEF_OPTION(tensor_lowerbound, "MEGENGINE_TENSOR_LOWERBOUND", 1048576, + DEF_OPTION(dtr_evictee_minimum_size, "MEGENGINE_DTR_EVICTEE_MINIMUM_SIZE", 1048576, "the minimum memory value of a tensor added to the candidate set"); DEF_OPTION(record_computing_path, "MEGENGINE_RECORD_COMPUTING_PATH", 0, "");