diff --git a/imperative/python/src/grad.cpp b/imperative/python/src/grad.cpp index 6680b7218f0e07d8390fc526e7449a2931b97ce5..75f1ba8dc14b041eb7ff722d38882cc585f6b4ff 100644 --- a/imperative/python/src/grad.cpp +++ b/imperative/python/src/grad.cpp @@ -14,6 +14,7 @@ #include "megbrain/imperative/backward_graph_opt.h" #include "megbrain/imperative/ops/autogen.h" #include "megbrain/imperative/proxy_graph_detail.h" +#include "megbrain/imperative/resource_manager.h" #include "megbrain/utils/mempool.h" #include "range/v3/all.hpp" diff --git a/imperative/python/src/tensor.cpp b/imperative/python/src/tensor.cpp index 1f74c27ffc3aee8c6c52eb98c21ec12015f8f77e..8d160e9aaa58e21dfa37259f46f6f99f2e9b91e1 100644 --- a/imperative/python/src/tensor.cpp +++ b/imperative/python/src/tensor.cpp @@ -1158,11 +1158,16 @@ void init_tensor(py::module m) { using Segment = TransformationManager::Segment; - auto* channel = interpreter::Interpreter::inst().create_channel().release(); + using Channel = interpreter::Interpreter::Channel; + + auto* channel = + imperative::ResourceManager::create_global>( + interpreter::Interpreter::inst().create_channel()) + ->get(); interpreter_for_py = channel; transformations.register_at( std::make_shared( - std::unique_ptr(channel))); + std::shared_ptr(channel, [](Channel*) {}))); transformations.register_at( std::make_shared()); diff --git a/imperative/src/impl/async_releaser.h b/imperative/src/impl/async_releaser.h index faade92a8ea8386bdbd8d00f98a37665b46abd68..6cf0d6dc4b7eec751bdf5b875708d190f8f42273 100644 --- a/imperative/src/impl/async_releaser.h +++ b/imperative/src/impl/async_releaser.h @@ -13,6 +13,7 @@ #include "megbrain/comp_node.h" #include "megbrain/imperative/blob_manager.h" +#include "megbrain/imperative/resource_manager.h" #include "megbrain/system.h" #include "./event_pool.h" @@ -61,8 +62,8 @@ protected: public: static AsyncReleaser* inst() { - static AsyncReleaser releaser; - return &releaser; + static auto* releaser = ResourceManager::create_global(); + return releaser; } ~AsyncReleaser() { m_waiter.wait_task_queue_empty(); } diff --git a/imperative/src/impl/event_pool.cpp b/imperative/src/impl/event_pool.cpp index 94a2a789222419b708e578558160f4e8e1eb46fa..66888ad6d3d5f5a4da2ed6ab23c93bcea16700bb 100644 --- a/imperative/src/impl/event_pool.cpp +++ b/imperative/src/impl/event_pool.cpp @@ -10,6 +10,9 @@ */ #include "./event_pool.h" +#include + +#include "megbrain/imperative/resource_manager.h" namespace mgb { namespace imperative { @@ -17,22 +20,18 @@ namespace imperative { EventPool::EventPool(size_t flags) : m_flags{flags} {} EventPool& EventPool::with_timer() { - static Spinlock lock; - static std::unique_ptr ptr; - MGB_LOCK_GUARD(lock); - if (!ptr || ptr->is_finalized()) { - ptr.reset(new EventPool(CompNode::Event::NEED_TIMER)); - } - return *ptr; + static auto* sm_pool = + ResourceManager::create_global>([] { + return std::unique_ptr( + new EventPool(CompNode::Event::NEED_TIMER)); + }); + return **sm_pool; } EventPool& EventPool::without_timer() { - static Spinlock lock; - static std::unique_ptr ptr; - MGB_LOCK_GUARD(lock); - if (!ptr || ptr->is_finalized()) { - ptr.reset(new EventPool()); - } - return *ptr; + static auto* sm_pool = + ResourceManager::create_global>( + [] { return std::unique_ptr(new EventPool()); }); + return **sm_pool; } CompNode::Event* EventPool::alloc(CompNode cn) { CompNode::EventPool* pool; diff --git a/imperative/src/impl/event_pool.h b/imperative/src/impl/event_pool.h index 4d6df0bb6db7986c9594db5ec83b7af6ee4e6724..300b039047fdb3b2b653db28fca8e6907e42e8af 100644 --- a/imperative/src/impl/event_pool.h +++ b/imperative/src/impl/event_pool.h @@ -31,6 +31,8 @@ public: void free(CompNode::Event* event); std::shared_ptr on_comp_node_finalize(); ~EventPool(); + + using CompNodeDepedentObject::is_finalized; }; } // namespace imperative } // namespace mgb diff --git a/imperative/src/impl/op_def.cpp b/imperative/src/impl/op_def.cpp index b555154d86196fbc2cd65ddc545bff9f7cf66d9d..e8c396205df71a649fcbf65f9c3099de1ec80ac5 100644 --- a/imperative/src/impl/op_def.cpp +++ b/imperative/src/impl/op_def.cpp @@ -14,6 +14,7 @@ #include #include "megbrain/imperative/ops/opr_attr.h" +#include "megbrain/imperative/resource_manager.h" #include "./op_trait.h" @@ -63,16 +64,16 @@ EncodedSubgraph OpDef::make_backward_graph( const SmallVector& output_has_grad) { using BackwardGraphCache = OpMethResultCache, SmallVector>; - thread_local auto cache = std::make_unique(); + thread_local auto& cache = *ResourceManager::create_local(); BackwardGraphCache::key_t cache_key{ const_cast(def).shared_from_this(), inputs, {input_requires_grad, output_has_grad}}; - auto iter = cache->find(cache_key); - if (iter == cache->end()) { - iter = cache->insert({cache_key, def.trait()->make_backward_graph( - def, inputs, input_requires_grad, - output_has_grad)}) + auto iter = cache.find(cache_key); + if (iter == cache.end()) { + iter = cache.insert({cache_key, def.trait()->make_backward_graph( + def, inputs, input_requires_grad, + output_has_grad)}) .first; } return iter->second; @@ -86,12 +87,12 @@ EncodedSubgraph OpDef::make_forward_graph( const OpDef& def, const SmallVector& inputs) { using ForwardGraphCache = OpMethResultCache, SmallVector>; - thread_local auto cache = std::make_unique(); + thread_local auto& cache = *ResourceManager::create_local(); ForwardGraphCache::key_t cache_key{ const_cast(def).shared_from_this(), inputs}; - auto iter = cache->find(cache_key); - if (iter == cache->end()) { - iter = cache->insert({cache_key, def.trait()->make_forward_graph(def, inputs)}) + auto iter = cache.find(cache_key); + if (iter == cache.end()) { + iter = cache.insert({cache_key, def.trait()->make_forward_graph(def, inputs)}) .first; } return iter->second; diff --git a/imperative/src/impl/ops/utility.cpp b/imperative/src/impl/ops/utility.cpp index 8ac834f249c54fdb89bdab9aab5094ce3c33015a..39940ac723443131ac50f8ddfbe35c4756aa7ca2 100644 --- a/imperative/src/impl/ops/utility.cpp +++ b/imperative/src/impl/ops/utility.cpp @@ -9,6 +9,7 @@ * "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. */ +#include #include #include "megbrain/imperative/graph_cache.h" @@ -16,6 +17,7 @@ #include "megbrain/imperative/ops/autogen.h" #include "megbrain/imperative/ops/opr_attr.h" #include "megbrain/imperative/ops/utility.h" +#include "megbrain/imperative/resource_manager.h" #include "megbrain/imperative/subgraph_detail.h" #include "megbrain/opr/io.h" #include "megbrain/opr/tensor_gen.h" @@ -510,16 +512,32 @@ struct ComputingGraphHolder { } }; +static std::atomic nr_cg_cache = 0; + template ComputingGraphHolder& get_computing_graph( std::shared_ptr compiled_op, const SmallVector& descs) { using ComputingGraphHolderCache = OpMethResultCache>>>; - thread_local auto cache = std::make_unique(); + thread_local auto& cache = ([]() -> auto& { + mgb_assert( + nr_cg_cache++ < 5, + "using subgraph in too many threads, this causes resource leakage"); +#if MGB_CUDA && defined(WIN32) + // FIXME: Create as global to skip resource finalize and windows with cuda + // doesn't cleanup global resources + return *ResourceManager::create_global(); +#else + // Otherwise this should be local because compnode may be unusable when global + // resource finalizing. + // For example, CpuCompNode.sync hang on because underlying thread died + return *ResourceManager::create_local(); +#endif + })(); thread_local size_t nr_cg_holders = 0; typename ComputingGraphHolderCache::key_t cache_key = {compiled_op, descs}; - auto& cg_holder_queue = (*cache)[cache_key]; + auto& cg_holder_queue = cache[cache_key]; std::unique_ptr> holder; if (!cg_holder_queue.empty()) { // pick one diff --git a/imperative/src/impl/physical_tensor.cpp b/imperative/src/impl/physical_tensor.cpp index f294adafb8d63849f477dad762864ebead7e307d..7f4c0710a4582b8c48555512cfd01df7941dd0c0 100644 --- a/imperative/src/impl/physical_tensor.cpp +++ b/imperative/src/impl/physical_tensor.cpp @@ -12,6 +12,7 @@ #include "megbrain/imperative.h" #include "megbrain/imperative/blob_manager.h" #include "megbrain/imperative/profiler.h" +#include "megbrain/imperative/resource_manager.h" #include "./async_releaser.h" #include "./event_pool.h" @@ -30,13 +31,6 @@ class CompNodeSyncManager : public CompNodeDepedentObject { std::mutex m_mtx; public: -#if MGB_CUDA && defined(WIN32) - //! FIXME: windows cuda driver shutdown before call atexit function even - //! register atexit function after init cuda driver! as a workround - //! recovery resource by OS temporarily, may need remove this after - //! upgrade cuda runtime - static bool is_into_atexit; -#endif std::shared_ptr on_comp_node_finalize() override { MGB_LOCK_GUARD(m_mtx); m_blob2event.clear(); @@ -44,17 +38,7 @@ public: } static CompNodeSyncManager& inst() { - static CompNodeSyncManager* sl_inst = new CompNodeSyncManager(); -#if MGB_CUDA && defined(WIN32) - //! FIXME: windows cuda driver shutdown before call atexit function even - //! register atexit function after init cuda driver! as a workround - //! recovery resource by OS temporarily, may need remove this after - //! upgrade cuda runtime - if (!is_into_atexit) { - auto err = atexit([] { is_into_atexit = true; }); - mgb_assert(!err, "failed to register atexit function"); - } -#endif + static auto* sl_inst = ResourceManager::create_global(); return *sl_inst; } @@ -73,13 +57,6 @@ public: m_blob2event.erase(blob); } }; -#if MGB_CUDA && defined(WIN32) -//! FIXME: windows cuda driver shutdown before call atexit function even -//! register atexit function after init cuda driver! as a workround -//! recovery resource by OS temporarily, may need remove this after -//! upgrade cuda runtime -bool CompNodeSyncManager::is_into_atexit = false; -#endif } // namespace @@ -106,15 +83,6 @@ Blob::Blob(CompNode cn, size_t sz) : m_comp_node{cn}, m_storage{}, m_size{sz} { Blob::~Blob() { BlobManager::inst()->unregister_blob(this); - -#if MGB_CUDA && defined(WIN32) - //! FIXME: windows cuda driver shutdown before call atexit function even - //! register atexit function after init cuda driver! as a workround - //! recovery resource by OS temporarily, may need remove this after - //! upgrade cuda runtime - if (CompNodeSyncManager::is_into_atexit) - return; -#endif CompNodeSyncManager::inst().remove(this); } @@ -242,8 +210,6 @@ void Tensor::static_initialize() { AsyncReleaser::inst(); CompNodeSyncManager::inst(); MultiCNConstTensorCache::inst(); - // clean all CompNodeDepedentObjects - mgb_assert(!atexit(CompNode::finalize), "atexit register failed"); } } // namespace imperative diff --git a/imperative/src/impl/resource_manager.cpp b/imperative/src/impl/resource_manager.cpp new file mode 100644 index 0000000000000000000000000000000000000000..49375b1cc0bc9b02c3c30a7c0c5c24206031830b --- /dev/null +++ b/imperative/src/impl/resource_manager.cpp @@ -0,0 +1,95 @@ +/** + * \file imperative/src/impl/resource_manager.cpp + * MegEngine is Licensed under the Apache License, Version 2.0 (the "License") + * + * Copyright (c) 2014-2021 Megvii Inc. All rights reserved. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + +#include "megbrain/imperative/resource_manager.h" + +#include +#include + +using namespace mgb; +using namespace imperative; + +namespace { + +class LocalResourceManager; + +std::unordered_map> + local_managers; +std::mutex global_lock; +bool throw_all_resources = false; + +class LocalResourceManager final : public ResourceManager { +private: + std::thread::id m_id; + +public: + LocalResourceManager() : m_id(std::this_thread::get_id()) {} + + std::thread::id id() const { return m_id; } +}; + +class GlobalResourceManager final : public ResourceManager { +public: + ~GlobalResourceManager() { +#if MGB_CUDA && defined(WIN32) + //! FIXME: windows cuda driver shutdown before call atexit function even + //! register atexit function after init cuda driver! as a workround + //! recovery resource by OS temporarily, may need remove this after + //! upgrade cuda runtime + throw_all_resources = true; +#endif + MGB_LOCK_GUARD(global_lock); + local_managers.clear(); + } +}; + +class LocalResourceManagerRef : public NonCopyableObj { +private: + std::weak_ptr m_manager; + +public: + LocalResourceManagerRef() { + auto manager = std::make_shared(); + mgb_assert( + local_managers.insert({manager->id(), manager}).second, + "duplicated local manager"); + m_manager = manager; + } + + ~LocalResourceManagerRef() { + if (auto manager = m_manager.lock()) { + local_managers.erase(manager->id()); + } + } + + ResourceManager& operator*() { return *m_manager.lock(); } +}; + +} // namespace + +void ResourceManager::clear() { + if (throw_all_resources) { + new std::vector(std::move(m_handles)); + } + for (auto iter = m_handles.rbegin(); iter != m_handles.rend(); ++iter) { + (*iter) = {}; + } +} + +ResourceManager& ResourceManager::get_global() { + static GlobalResourceManager sl_manager; + return sl_manager; +} + +ResourceManager& ResourceManager::get_local() { + thread_local LocalResourceManagerRef tl_manager; + return *tl_manager; +} diff --git a/imperative/src/impl/transformations/grad.cpp b/imperative/src/impl/transformations/grad.cpp index 5a3023ae82efb81fbdef1658df1adc2906b2a18a..bc968a607f01c9b18abb6631d3d9efa8ce2c7ca5 100644 --- a/imperative/src/impl/transformations/grad.cpp +++ b/imperative/src/impl/transformations/grad.cpp @@ -12,6 +12,7 @@ #include "megbrain/imperative/transformations/grad.h" #include "megbrain/imperative/graph_cache.h" +#include "megbrain/imperative/resource_manager.h" #include @@ -24,7 +25,8 @@ static std::shared_ptr make_optimized_backward_gra // hash using OptimizedBackwardGraphCache = OpMethResultCache< std::shared_ptr, SmallVector>; - thread_local auto cache = std::make_unique(); + thread_local auto& cache = + *ResourceManager::create_local(); OptimizedBackwardGraphCache::key_t cache_key{op}; SmallVector& input_descs = cache_key.inputs; std::get<0>(cache_key.extras) = inputs_require_grad.copy_into>(); @@ -34,8 +36,8 @@ static std::shared_ptr make_optimized_backward_gra input_descs[i].comp_node = inputs[i].device().cast(); } - auto iter = cache->find(cache_key); - if (iter != cache->end()) { + auto iter = cache.find(cache_key); + if (iter != cache.end()) { return iter->second; } @@ -47,7 +49,7 @@ static std::shared_ptr make_optimized_backward_gra if (!bg.graph.empty()) { ret = std::make_shared(bg); } - cache->emplace(cache_key, ret); + cache.emplace(cache_key, ret); return ret; } diff --git a/imperative/src/include/megbrain/imperative/physical_tensor.h b/imperative/src/include/megbrain/imperative/physical_tensor.h index d19c565a0fd10b09ae1a5b038566e1e02889608a..46ef85047083124e5503bfb0015974165a8c94a5 100644 --- a/imperative/src/include/megbrain/imperative/physical_tensor.h +++ b/imperative/src/include/megbrain/imperative/physical_tensor.h @@ -14,6 +14,7 @@ #include #include +#include "megbrain/imperative/resource_manager.h" #include "megbrain/tensor.h" namespace mgb { @@ -278,8 +279,9 @@ struct MultiCNConstTensorCache : CompNodeDepedentObject { } static MultiCNConstTensorCache& inst() { - static MultiCNConstTensorCache sl_inst; - return sl_inst; + static auto* sl_inst = + ResourceManager::create_global(); + return *sl_inst; } }; diff --git a/imperative/src/include/megbrain/imperative/resource_manager.h b/imperative/src/include/megbrain/imperative/resource_manager.h new file mode 100644 index 0000000000000000000000000000000000000000..47eacdecafdcc9ddd820e62c4300983f822a5aa0 --- /dev/null +++ b/imperative/src/include/megbrain/imperative/resource_manager.h @@ -0,0 +1,87 @@ +/** + * \file imperative/src/include/megbrain/imperative/resource_manager.h + * MegEngine is Licensed under the Apache License, Version 2.0 (the "License") + * + * Copyright (c) 2014-2021 Megvii Inc. All rights reserved. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT ARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "megbrain/common.h" +#include "megbrain/utils/metahelper.h" +#include "megbrain/utils/thread.h" + +namespace mgb { +namespace imperative { + +class ResourceManager : public NonCopyableObj { +protected: + std::vector m_handles; + std::mutex m_mutex; + +private: + static ResourceManager& get_global(); + static ResourceManager& get_local(); + +public: + template + static T* create_global(TArgs&&... args) { + mgb_log_debug("create global resource: %s", typeid(T).name()); + auto instance = std::make_shared(std::forward(args)...); + auto& manager = get_global(); + MGB_LOCK_GUARD(manager.m_mutex); + manager.m_handles.push_back((std::any)instance); + return instance.get(); + } + + template + static T* create_local(TArgs&&... args) { + mgb_log_debug("create local resource: %s", typeid(T).name()); + auto instance = std::make_shared(std::forward(args)...); + get_local().m_handles.push_back((std::any)instance); + return instance.get(); + } + + void clear(); + + ~ResourceManager() { clear(); } +}; + +template +class CompNodeDependentResource : public NonCopyableObj { +private: + std::function()> m_ctor; + std::unique_ptr m_ptr; + Spinlock m_spin; + +public: + explicit CompNodeDependentResource(std::function()> ctor) + : m_ctor(ctor) {} + + T& operator*() { + if ((!m_ptr) || m_ptr->is_finalized()) { + m_ptr = m_ctor(); + } + return *m_ptr; + } + + T* operator->() { + if ((!m_ptr) || m_ptr->is_finalized()) { + m_ptr = m_ctor(); + } + return m_ptr.get(); + } +}; + +} // namespace imperative +} // namespace mgb diff --git a/imperative/src/include/megbrain/imperative/transformations/eval.h b/imperative/src/include/megbrain/imperative/transformations/eval.h index a35c74de6a66a4485839b8d1cd2250e3927e81e4..ff5b21532ca02b7de63326be65b61089475c44d1 100644 --- a/imperative/src/include/megbrain/imperative/transformations/eval.h +++ b/imperative/src/include/megbrain/imperative/transformations/eval.h @@ -63,10 +63,10 @@ public: using Channel = Interpreter::Channel; private: - std::unique_ptr m_channel; + std::shared_ptr m_channel; public: - explicit InterpreterTransformation(std::unique_ptr channel) + explicit InterpreterTransformation(std::shared_ptr channel) : m_channel{std::move(channel)} {} Channel* channel() { return m_channel.get(); }