diff --git a/src/core/impl/comp_node/cpu/comp_node.cpp b/src/core/impl/comp_node/cpu/comp_node.cpp index 38e4b4de262f3245acb4e680c000015d79f7058d..903712a263b2b4078420fd766821f69dffa47e7d 100644 --- a/src/core/impl/comp_node/cpu/comp_node.cpp +++ b/src/core/impl/comp_node/cpu/comp_node.cpp @@ -995,7 +995,7 @@ bool CpuCompNode::CpuDispatchableBase::EventImpl::do_finished() { } void CpuCompNode::CpuDispatchableBase::EventImpl::host_wait_cv() { - for (size_t i = 0, it = SCQueueSynchronizer::max_spin() / 20; i < it; ++i) { + for (size_t i = 0, it = SCQueueSynchronizer::get_default_max_spin() / 20; i < it; ++i) { if (finished()) { return; } diff --git a/src/core/impl/utils/comp_node_sync_manager.cpp b/src/core/impl/utils/comp_node_sync_manager.cpp index a5bf776d4ebd076695040958544f8f8b74679dc5..912e10b0b62f6b8c2f64f34a1d9d423c90438c1c 100644 --- a/src/core/impl/utils/comp_node_sync_manager.cpp +++ b/src/core/impl/utils/comp_node_sync_manager.cpp @@ -73,7 +73,7 @@ CompNodeSyncManager& CompNodeSyncManager::busy_wait_set_ready() { "before actually waiting on a tensor," " you must call set_has_waiter first"); - size_t spin = 0, max_spin = SCQueueSynchronizer::max_spin(); + size_t spin = 0, max_spin = SCQueueSynchronizer::get_default_max_spin(); while (!m_nr_ready.load()) { ++spin; if (spin >= max_spin) { diff --git a/src/core/impl/utils/thread.cpp b/src/core/impl/utils/thread.cpp index f6f72c2432174827f80d368f620c604092143eab..d222888096f24ddf2cbe53a944f424eb788ca87d 100644 --- a/src/core/impl/utils/thread.cpp +++ b/src/core/impl/utils/thread.cpp @@ -72,26 +72,28 @@ namespace { } /* =============== SCQueueSynchronizer =============== */ -size_t SCQueueSynchronizer::cached_max_spin = 0; +size_t SCQueueSynchronizer::cached_default_max_spin = 0; #ifdef WIN32 bool SCQueueSynchronizer::is_into_atexit = false; #endif -size_t SCQueueSynchronizer::max_spin() { - if (cached_max_spin) - return cached_max_spin; +size_t SCQueueSynchronizer::get_default_max_spin() { + if (cached_default_max_spin) + return cached_default_max_spin; if (MGB_GETENV("MGB_WORKER_NO_SLEEP")) { mgb_log_warn("worker would not sleep"); - return cached_max_spin = std::numeric_limits::max(); + return cached_default_max_spin = std::numeric_limits::max(); } if (auto spin_string = MGB_GETENV("MGB_WORKER_MAX_SPIN")) { auto spin = std::stoi(spin_string); mgb_log_warn("worker would execute with spin of %d", spin); - return cached_max_spin = spin; + return cached_default_max_spin = spin; } + // heuristically, let CPU spinning around 5ms at most before CPU yield. + // we are going to measure how many spins will spent 5ms on current platform. std::atomic_bool start{false}, stop{false}; size_t cnt; double cnt_time; @@ -115,11 +117,13 @@ size_t SCQueueSynchronizer::max_spin() { } stop.store(true); worker.join(); - cached_max_spin = std::max(cnt * (5 / cnt_time), 100000); - return cached_max_spin; + cached_default_max_spin = std::max(cnt * (5 / cnt_time), 100000); + return cached_default_max_spin; } -SCQueueSynchronizer::SCQueueSynchronizer() = default; +SCQueueSynchronizer::SCQueueSynchronizer(size_t max_spin) { + m_max_spin = max_spin; +} SCQueueSynchronizer::~SCQueueSynchronizer() noexcept { if (!m_worker_started) @@ -203,13 +207,13 @@ void SCQueueSynchronizer::producer_wait() { size_t SCQueueSynchronizer::consumer_fetch(size_t max, size_t min) { mgb_assert(max >= min && min >= 1); - size_t spin = 0, max_spin = SCQueueSynchronizer::max_spin(), + size_t spin = 0, cur_finished = m_finished_task.load(std::memory_order_relaxed); // relaxed mem order suffices because acquire would be called for ret while (m_tot_task.load(std::memory_order_relaxed) < cur_finished + min) { ++ spin; - if (spin >= max_spin) { + if (spin >= m_max_spin) { while (m_consumer_waiting.test_and_set(std::memory_order_relaxed)); SpinlockReleaser releaser(m_consumer_waiting); diff --git a/src/core/include/megbrain/utils/thread_impl_0.h b/src/core/include/megbrain/utils/thread_impl_0.h index fccf44de7e55ba5074b36361f99a66676334522f..44b169b1c3fc5c7e5394a4c44da5582a41535434 100644 --- a/src/core/include/megbrain/utils/thread_impl_0.h +++ b/src/core/include/megbrain/utils/thread_impl_0.h @@ -46,15 +46,18 @@ namespace mgb { class SCQueueSynchronizer { public: - static size_t max_spin() { - return 0; - } + SCQueueSynchronizer(size_t max_spin) {} + + static size_t get_default_max_spin() { return 0; } }; // tasks would be dispatched inplace template class AsyncQueueSC: public NonCopyableObj { public: + AsyncQueueSC() {} + AsyncQueueSC(size_t max_spin) {} + virtual ~AsyncQueueSC() = default; void add_task(const Param ¶m) { diff --git a/src/core/include/megbrain/utils/thread_impl_1.h b/src/core/include/megbrain/utils/thread_impl_1.h index 137ab59e4cd0f83b95df35f89161c9306dbcacb4..3a8e5f8481dd9254427198133c6a06c5554e95e0 100644 --- a/src/core/include/megbrain/utils/thread_impl_1.h +++ b/src/core/include/megbrain/utils/thread_impl_1.h @@ -50,7 +50,11 @@ namespace mgb { * wrap around within a practical time, which would crash the system. */ class SCQueueSynchronizer { - static size_t cached_max_spin; + //! cached value for global default max spin, read and stored by get_default_max_spin + static size_t cached_default_max_spin; + + //! synchronizer wait at most m_max_spin before CPU yield + size_t m_max_spin; std::atomic_flag m_consumer_waiting = ATOMIC_FLAG_INIT; std::atomic_bool m_should_exit{false}; bool m_worker_started = false, m_wait_finish_called = false; @@ -65,7 +69,8 @@ namespace mgb { std::thread m_worker_thread; public: - SCQueueSynchronizer(); + SCQueueSynchronizer(size_t max_spin); + ~SCQueueSynchronizer() noexcept; bool worker_started() const { @@ -79,7 +84,8 @@ namespace mgb { } #endif - static size_t max_spin(); + //! get global default max spin from env + static size_t get_default_max_spin(); void start_worker(std::thread thread); @@ -150,6 +156,11 @@ namespace mgb { }; public: + AsyncQueueSC() : m_synchronizer(SCQueueSynchronizer::get_default_max_spin()) {} + + //! specify max spin manually, caller must ensure the given value is optimal, + //! otherwise caller should leave the value adjustable by user. + AsyncQueueSC(size_t max_spin) : m_synchronizer(max_spin) {} #ifdef WIN32 bool check_is_into_atexit() { if (SCQueueSynchronizer::is_into_atexit) { diff --git a/src/core/test/utils/thread.cpp b/src/core/test/utils/thread.cpp index 160477c4a14f0274543b546545a5b39bfbd31417..a3846f1707910edde43981359432c208889dfd76 100644 --- a/src/core/test/utils/thread.cpp +++ b/src/core/test/utils/thread.cpp @@ -43,7 +43,7 @@ namespace { template void test_scq_sync_multi_producer() { size_t nr_worker_call = 0; - SCQueueSynchronizer sync; + SCQueueSynchronizer sync(0); auto worker = [&]() { RNGxorshf rng{next_rand_seed()}; while (auto nr = sync.consumer_fetch(1)) { @@ -87,7 +87,7 @@ namespace { TEST(TestAsyncQueue, Synchronizer) { size_t nr_worker_call = 0; - SCQueueSynchronizer sync; + SCQueueSynchronizer sync(0); auto worker = [&]() { for (; ;) { auto nr = sync.consumer_fetch(1); @@ -115,7 +115,7 @@ TEST(TestAsyncQueue, Synchronizer) { TEST(TestAsyncQueue, SynchronizerWaitOverhead) { { size_t nr_worker_call = 0; - SCQueueSynchronizer sync; + SCQueueSynchronizer sync(0); auto worker = [&]() { for (;;) { auto nr = sync.consumer_fetch(1); @@ -141,7 +141,7 @@ TEST(TestAsyncQueue, SynchronizerWaitOverhead) { double worker_time = 0, avg_await; { size_t nr_worker_call = 0; - SCQueueSynchronizer sync; + SCQueueSynchronizer sync(0); auto worker = [&]() { for (;;) { auto nr = sync.consumer_fetch(1); @@ -188,7 +188,7 @@ TEST(TestAsyncQueue, SynchronizerMultiProducer3) { } TEST(TestAsyncQueue, SynchronizerWaiterStarving) { - SCQueueSynchronizer sync; + SCQueueSynchronizer sync(0); std::atomic_size_t processed{0}; auto worker = [&]() { while (sync.consumer_fetch(1)) {