From 0035efa6368e6ebce20747bc2487a1c151db0c2f Mon Sep 17 00:00:00 2001 From: Megvii Engine Team Date: Wed, 24 Feb 2021 15:26:12 +0800 Subject: [PATCH] fix(mge): limit task queue size GitOrigin-RevId: 9481d389140608eaec347bafc7b9b8f180786142 --- .../src/impl/interpreter/interpreter_impl.h | 3 +- .../include/megbrain/utils/thread_impl_0.h | 4 +-- .../include/megbrain/utils/thread_impl_1.h | 36 +++++++++++++------ 3 files changed, 28 insertions(+), 15 deletions(-) diff --git a/imperative/src/impl/interpreter/interpreter_impl.h b/imperative/src/impl/interpreter/interpreter_impl.h index 49a2fff30..0757c6635 100644 --- a/imperative/src/impl/interpreter/interpreter_impl.h +++ b/imperative/src/impl/interpreter/interpreter_impl.h @@ -120,8 +120,9 @@ private: // set max_spin=0 to prevent Queue fetch task in busy wait manner. // this won't affect throughput when python interpreter is sending enough task, // but will significantly save CPU time when waiting for task, e.g. wait for data input + // limit pending tasks to 1000000 WorkQueue(ChannelImpl* owner) - : AsyncQueueSC(0), m_owner(owner) { + : AsyncQueueSC(0, 1000000), m_owner(owner) { sys::set_thread_name("interpreter"); } void process_one_task(IdentifiedCommand& icmd) { diff --git a/src/core/include/megbrain/utils/thread_impl_0.h b/src/core/include/megbrain/utils/thread_impl_0.h index 973934642..763ae23d0 100644 --- a/src/core/include/megbrain/utils/thread_impl_0.h +++ b/src/core/include/megbrain/utils/thread_impl_0.h @@ -55,8 +55,7 @@ namespace mgb { template class AsyncQueueSC: public NonCopyableObj { public: - AsyncQueueSC() {} - AsyncQueueSC(size_t max_spin) {} + AsyncQueueSC(ptrdiff_t max_spin = -1, ptrdiff_t max_items = -1) {} virtual ~AsyncQueueSC() = default; @@ -91,4 +90,3 @@ namespace mgb { } // vim: syntax=cpp.doxygen foldmethod=marker foldmarker=f{{{,f}}} - diff --git a/src/core/include/megbrain/utils/thread_impl_1.h b/src/core/include/megbrain/utils/thread_impl_1.h index d5d540abc..9f2abcefa 100644 --- a/src/core/include/megbrain/utils/thread_impl_1.h +++ b/src/core/include/megbrain/utils/thread_impl_1.h @@ -156,11 +156,16 @@ namespace mgb { }; public: - AsyncQueueSC() : m_synchronizer(SCQueueSynchronizer::get_default_max_spin()) {} - - //! specify max spin manually, caller must ensure the given value is optimal, - //! otherwise caller should leave the value adjustable by user. - AsyncQueueSC(size_t max_spin) : m_synchronizer(max_spin) {} + //! \param max_spin specify max spin manually, caller must ensure the given value + //! is optimal, otherwise caller should leave the value adjustable by user. + //! \param max_items limit memory usage by number of items + AsyncQueueSC(ptrdiff_t max_spin = -1, ptrdiff_t max_items = -1) + : m_synchronizer(max_spin >= 0 ? max_spin : SCQueueSynchronizer::get_default_max_spin()) { + if (max_items >= 0) { + // -1 / 2 == 0 + m_block_quota = (max_items - 1) / BLOCK_SIZE + 1; + } + } #ifdef WIN32 bool check_is_into_atexit() { if (SCQueueSynchronizer::is_into_atexit) { @@ -290,8 +295,10 @@ namespace mgb { TaskBlock *m_queue_tail = nullptr; std::atomic_size_t m_queue_tail_tid{0}, //!< id of next task m_finished_task{0}; + size_t m_block_quota = std::numeric_limits::max(); std::vector> m_free_task_block; Spinlock m_mutex; + std::condition_variable_any m_cv; SyncedParam *m_cur_task = nullptr; SCQueueSynchronizer m_synchronizer; #if MGB_ENABLE_EXCEPTION @@ -354,12 +361,18 @@ namespace mgb { std::unique_ptr allocate_task_block_unsafe( TaskBlock *prev) { std::unique_ptr ret; - if (!m_free_task_block.empty()) { - ret = std::move(m_free_task_block.back()); - m_free_task_block.pop_back(); - } else { - ret = std::make_unique(); - } + do { + if (!m_free_task_block.empty()) { + ret = std::move(m_free_task_block.back()); + m_free_task_block.pop_back(); + } else if (m_block_quota > 0) { + ret = std::make_unique(); + m_block_quota--; + } else { + m_cv.wait(m_mutex); + continue; + } + } while (false); ret->first_tid = m_new_block_first_tid; m_new_block_first_tid += BLOCK_SIZE; ret->prev = prev; @@ -402,6 +415,7 @@ namespace mgb { } else { m_queue_tail = nullptr; } + m_cv.notify_one(); } SyncedParam &cur = m_queue_head->params[qh ++]; -- GitLab