提交 0035efa6 编写于 作者: M Megvii Engine Team

fix(mge): limit task queue size

GitOrigin-RevId: 9481d389140608eaec347bafc7b9b8f180786142
上级 5a1f9134
...@@ -120,8 +120,9 @@ private: ...@@ -120,8 +120,9 @@ private:
// set max_spin=0 to prevent Queue fetch task in busy wait manner. // set max_spin=0 to prevent Queue fetch task in busy wait manner.
// this won't affect throughput when python interpreter is sending enough task, // this won't affect throughput when python interpreter is sending enough task,
// but will significantly save CPU time when waiting for task, e.g. wait for data input // but will significantly save CPU time when waiting for task, e.g. wait for data input
// limit pending tasks to 1000000
WorkQueue(ChannelImpl* owner) WorkQueue(ChannelImpl* owner)
: AsyncQueueSC<IdentifiedCommand, WorkQueue>(0), m_owner(owner) { : AsyncQueueSC<IdentifiedCommand, WorkQueue>(0, 1000000), m_owner(owner) {
sys::set_thread_name("interpreter"); sys::set_thread_name("interpreter");
} }
void process_one_task(IdentifiedCommand& icmd) { void process_one_task(IdentifiedCommand& icmd) {
......
...@@ -55,8 +55,7 @@ namespace mgb { ...@@ -55,8 +55,7 @@ namespace mgb {
template<typename Param, class TaskImpl> template<typename Param, class TaskImpl>
class AsyncQueueSC: public NonCopyableObj { class AsyncQueueSC: public NonCopyableObj {
public: public:
AsyncQueueSC() {} AsyncQueueSC(ptrdiff_t max_spin = -1, ptrdiff_t max_items = -1) {}
AsyncQueueSC(size_t max_spin) {}
virtual ~AsyncQueueSC() = default; virtual ~AsyncQueueSC() = default;
...@@ -91,4 +90,3 @@ namespace mgb { ...@@ -91,4 +90,3 @@ namespace mgb {
} }
// vim: syntax=cpp.doxygen foldmethod=marker foldmarker=f{{{,f}}} // vim: syntax=cpp.doxygen foldmethod=marker foldmarker=f{{{,f}}}
...@@ -156,11 +156,16 @@ namespace mgb { ...@@ -156,11 +156,16 @@ namespace mgb {
}; };
public: public:
AsyncQueueSC() : m_synchronizer(SCQueueSynchronizer::get_default_max_spin()) {} //! \param max_spin specify max spin manually, caller must ensure the given value
//! is optimal, otherwise caller should leave the value adjustable by user.
//! specify max spin manually, caller must ensure the given value is optimal, //! \param max_items limit memory usage by number of items
//! otherwise caller should leave the value adjustable by user. AsyncQueueSC(ptrdiff_t max_spin = -1, ptrdiff_t max_items = -1)
AsyncQueueSC(size_t max_spin) : m_synchronizer(max_spin) {} : m_synchronizer(max_spin >= 0 ? max_spin : SCQueueSynchronizer::get_default_max_spin()) {
if (max_items >= 0) {
// -1 / 2 == 0
m_block_quota = (max_items - 1) / BLOCK_SIZE + 1;
}
}
#ifdef WIN32 #ifdef WIN32
bool check_is_into_atexit() { bool check_is_into_atexit() {
if (SCQueueSynchronizer::is_into_atexit) { if (SCQueueSynchronizer::is_into_atexit) {
...@@ -290,8 +295,10 @@ namespace mgb { ...@@ -290,8 +295,10 @@ namespace mgb {
TaskBlock *m_queue_tail = nullptr; TaskBlock *m_queue_tail = nullptr;
std::atomic_size_t m_queue_tail_tid{0}, //!< id of next task std::atomic_size_t m_queue_tail_tid{0}, //!< id of next task
m_finished_task{0}; m_finished_task{0};
size_t m_block_quota = std::numeric_limits<size_t>::max();
std::vector<std::unique_ptr<TaskBlock>> m_free_task_block; std::vector<std::unique_ptr<TaskBlock>> m_free_task_block;
Spinlock m_mutex; Spinlock m_mutex;
std::condition_variable_any m_cv;
SyncedParam *m_cur_task = nullptr; SyncedParam *m_cur_task = nullptr;
SCQueueSynchronizer m_synchronizer; SCQueueSynchronizer m_synchronizer;
#if MGB_ENABLE_EXCEPTION #if MGB_ENABLE_EXCEPTION
...@@ -354,12 +361,18 @@ namespace mgb { ...@@ -354,12 +361,18 @@ namespace mgb {
std::unique_ptr<TaskBlock> allocate_task_block_unsafe( std::unique_ptr<TaskBlock> allocate_task_block_unsafe(
TaskBlock *prev) { TaskBlock *prev) {
std::unique_ptr<TaskBlock> ret; std::unique_ptr<TaskBlock> ret;
if (!m_free_task_block.empty()) { do {
ret = std::move(m_free_task_block.back()); if (!m_free_task_block.empty()) {
m_free_task_block.pop_back(); ret = std::move(m_free_task_block.back());
} else { m_free_task_block.pop_back();
ret = std::make_unique<TaskBlock>(); } else if (m_block_quota > 0) {
} ret = std::make_unique<TaskBlock>();
m_block_quota--;
} else {
m_cv.wait(m_mutex);
continue;
}
} while (false);
ret->first_tid = m_new_block_first_tid; ret->first_tid = m_new_block_first_tid;
m_new_block_first_tid += BLOCK_SIZE; m_new_block_first_tid += BLOCK_SIZE;
ret->prev = prev; ret->prev = prev;
...@@ -402,6 +415,7 @@ namespace mgb { ...@@ -402,6 +415,7 @@ namespace mgb {
} else { } else {
m_queue_tail = nullptr; m_queue_tail = nullptr;
} }
m_cv.notify_one();
} }
SyncedParam &cur = m_queue_head->params[qh ++]; SyncedParam &cur = m_queue_head->params[qh ++];
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册