From a9ea41c5e251e2cf8b15d286e938a961d8c1cb28 Mon Sep 17 00:00:00 2001 From: liutiexing <74819124+liutiexing@users.noreply.github.com> Date: Wed, 29 Sep 2021 15:10:03 +0800 Subject: [PATCH] Spinlock (#36030) * add align for WorkQueue * add spinlock * merge spinlock --- .../fluid/framework/new_executor/run_queue.h | 10 +++-- .../fluid/framework/new_executor/workqueue.cc | 4 +- .../framework/new_executor/workqueue_utils.h | 1 + paddle/fluid/memory/allocation/spin_lock.h | 43 ++++++++++++------- 4 files changed, 36 insertions(+), 22 deletions(-) diff --git a/paddle/fluid/framework/new_executor/run_queue.h b/paddle/fluid/framework/new_executor/run_queue.h index 13035237ff8..e457b20a3c3 100644 --- a/paddle/fluid/framework/new_executor/run_queue.h +++ b/paddle/fluid/framework/new_executor/run_queue.h @@ -37,6 +37,8 @@ #include #include #include +#include "paddle/fluid/framework/new_executor/workqueue_utils.h" +#include "paddle/fluid/memory/allocation/spin_lock.h" namespace paddle { namespace framework { @@ -101,7 +103,7 @@ class RunQueue { // PushBack adds w at the end of the queue. // If queue is full returns w, otherwise returns default-constructed Work. Work PushBack(Work w) { - std::unique_lock lock(mutex_); + std::unique_lock lock(mutex_); unsigned back = back_.load(std::memory_order_relaxed); Elem* e = &array_[(back - 1) & kMask]; uint8_t s = e->state.load(std::memory_order_relaxed); @@ -123,7 +125,7 @@ class RunQueue { return Work(); } - std::unique_lock lock(mutex_); + std::unique_lock lock(mutex_); unsigned back = back_.load(std::memory_order_relaxed); Elem* e = &array_[back & kMask]; uint8_t s = e->state.load(std::memory_order_relaxed); @@ -145,7 +147,7 @@ class RunQueue { return 0; } - std::unique_lock lock(mutex_); + std::unique_lock lock(mutex_); unsigned back = back_.load(std::memory_order_relaxed); unsigned size = Size(); unsigned mid = back; @@ -213,7 +215,7 @@ class RunQueue { // modification counters. alignas(64) std::atomic front_; alignas(64) std::atomic back_; - std::mutex mutex_; + paddle::memory::SpinLock mutex_; Elem array_[kSize]; // SizeOrNotEmpty returns current queue size; if NeedSizeEstimate is false, diff --git a/paddle/fluid/framework/new_executor/workqueue.cc b/paddle/fluid/framework/new_executor/workqueue.cc index bc5a4e27dc5..8c6eeab4d5c 100644 --- a/paddle/fluid/framework/new_executor/workqueue.cc +++ b/paddle/fluid/framework/new_executor/workqueue.cc @@ -166,7 +166,7 @@ std::unique_ptr CreateMultiThreadedWorkQueue( "WorkQueueOptions.num_threads must be " "greater than 1.")); std::unique_ptr ptr(new WorkQueueImpl(options)); - return ptr; + return std::move(ptr); } std::unique_ptr CreateWorkQueueGroup( @@ -176,7 +176,7 @@ std::unique_ptr CreateWorkQueueGroup( "For a WorkQueueGroup, the number of WorkQueueOptions " "must be greater than 1.")); std::unique_ptr ptr(new WorkQueueGroupImpl(queues_options)); - return ptr; + return std::move(ptr); } } // namespace framework diff --git a/paddle/fluid/framework/new_executor/workqueue_utils.h b/paddle/fluid/framework/new_executor/workqueue_utils.h index 6907f2f17da..bb219fea362 100644 --- a/paddle/fluid/framework/new_executor/workqueue_utils.h +++ b/paddle/fluid/framework/new_executor/workqueue_utils.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include diff --git a/paddle/fluid/memory/allocation/spin_lock.h b/paddle/fluid/memory/allocation/spin_lock.h index 42462fd74b4..2bbe340e7c6 100644 --- a/paddle/fluid/memory/allocation/spin_lock.h +++ b/paddle/fluid/memory/allocation/spin_lock.h @@ -15,37 +15,48 @@ #pragma once #include -#if !defined(_WIN32) -#include -#else -#include -#endif // !_WIN32 +#if defined(_M_X64) || defined(__x86_64__) || defined(_M_IX86) || \ + defined(__i386__) +#define __PADDLE_x86__ +#include +#endif +#include #include "paddle/fluid/platform/macros.h" namespace paddle { namespace memory { +static inline void CpuRelax() { +#if defined(__PADDLE_x86__) + _mm_pause(); +#endif +} class SpinLock { public: SpinLock() : mlock_(false) {} void lock() { - bool expect = false; - uint64_t spin_cnt = 0; - while (!mlock_.compare_exchange_weak(expect, true)) { - expect = false; - if ((++spin_cnt & 0xFF) == 0) { -#if defined(_WIN32) - SleepEx(50, FALSE); -#else - sched_yield(); -#endif + for (;;) { + if (!mlock_.exchange(true, std::memory_order_acquire)) { + break; + } + constexpr int kMaxLoop = 32; + for (int loop = 1; mlock_.load(std::memory_order_relaxed);) { + if (loop <= kMaxLoop) { + for (int i = 1; i <= loop; ++i) { + CpuRelax(); + } + loop *= 2; + } else { + std::this_thread::yield(); + } } } } - void unlock() { mlock_.store(false); } + void unlock() { mlock_.store(false, std::memory_order_release); } + DISABLE_COPY_AND_ASSIGN(SpinLock); private: -- GitLab