diff --git a/paddle/fluid/framework/new_executor/run_queue.h b/paddle/fluid/framework/new_executor/run_queue.h index 13035237ff8b48a82763d739713ca489ca902e7c..e457b20a3c35d551e726d10c31abd6797ebed76c 100644 --- a/paddle/fluid/framework/new_executor/run_queue.h +++ b/paddle/fluid/framework/new_executor/run_queue.h @@ -37,6 +37,8 @@ #include #include #include +#include "paddle/fluid/framework/new_executor/workqueue_utils.h" +#include "paddle/fluid/memory/allocation/spin_lock.h" namespace paddle { namespace framework { @@ -101,7 +103,7 @@ class RunQueue { // PushBack adds w at the end of the queue. // If queue is full returns w, otherwise returns default-constructed Work. Work PushBack(Work w) { - std::unique_lock lock(mutex_); + std::unique_lock lock(mutex_); unsigned back = back_.load(std::memory_order_relaxed); Elem* e = &array_[(back - 1) & kMask]; uint8_t s = e->state.load(std::memory_order_relaxed); @@ -123,7 +125,7 @@ class RunQueue { return Work(); } - std::unique_lock lock(mutex_); + std::unique_lock lock(mutex_); unsigned back = back_.load(std::memory_order_relaxed); Elem* e = &array_[back & kMask]; uint8_t s = e->state.load(std::memory_order_relaxed); @@ -145,7 +147,7 @@ class RunQueue { return 0; } - std::unique_lock lock(mutex_); + std::unique_lock lock(mutex_); unsigned back = back_.load(std::memory_order_relaxed); unsigned size = Size(); unsigned mid = back; @@ -213,7 +215,7 @@ class RunQueue { // modification counters. alignas(64) std::atomic front_; alignas(64) std::atomic back_; - std::mutex mutex_; + paddle::memory::SpinLock mutex_; Elem array_[kSize]; // SizeOrNotEmpty returns current queue size; if NeedSizeEstimate is false, diff --git a/paddle/fluid/framework/new_executor/workqueue.cc b/paddle/fluid/framework/new_executor/workqueue.cc index bc5a4e27dc528a611cc4195f4d3c7d62e057fd15..8c6eeab4d5c0a1553392b19dc11d34035679ff03 100644 --- a/paddle/fluid/framework/new_executor/workqueue.cc +++ b/paddle/fluid/framework/new_executor/workqueue.cc @@ -166,7 +166,7 @@ std::unique_ptr CreateMultiThreadedWorkQueue( "WorkQueueOptions.num_threads must be " "greater than 1.")); std::unique_ptr ptr(new WorkQueueImpl(options)); - return ptr; + return std::move(ptr); } std::unique_ptr CreateWorkQueueGroup( @@ -176,7 +176,7 @@ std::unique_ptr CreateWorkQueueGroup( "For a WorkQueueGroup, the number of WorkQueueOptions " "must be greater than 1.")); std::unique_ptr ptr(new WorkQueueGroupImpl(queues_options)); - return ptr; + return std::move(ptr); } } // namespace framework diff --git a/paddle/fluid/framework/new_executor/workqueue_utils.h b/paddle/fluid/framework/new_executor/workqueue_utils.h index 6907f2f17da0db38cecbab93b87da6890b38f904..bb219fea36267a64d5d7f27e98458f395ecab209 100644 --- a/paddle/fluid/framework/new_executor/workqueue_utils.h +++ b/paddle/fluid/framework/new_executor/workqueue_utils.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include diff --git a/paddle/fluid/memory/allocation/spin_lock.h b/paddle/fluid/memory/allocation/spin_lock.h index 42462fd74b4cd7d47e59cf67ff0c9467ee66f7b9..2bbe340e7c69124b1e84f67780fc1891bdf1932f 100644 --- a/paddle/fluid/memory/allocation/spin_lock.h +++ b/paddle/fluid/memory/allocation/spin_lock.h @@ -15,37 +15,48 @@ #pragma once #include -#if !defined(_WIN32) -#include -#else -#include -#endif // !_WIN32 +#if defined(_M_X64) || defined(__x86_64__) || defined(_M_IX86) || \ + defined(__i386__) +#define __PADDLE_x86__ +#include +#endif +#include #include "paddle/fluid/platform/macros.h" namespace paddle { namespace memory { +static inline void CpuRelax() { +#if defined(__PADDLE_x86__) + _mm_pause(); +#endif +} class SpinLock { public: SpinLock() : mlock_(false) {} void lock() { - bool expect = false; - uint64_t spin_cnt = 0; - while (!mlock_.compare_exchange_weak(expect, true)) { - expect = false; - if ((++spin_cnt & 0xFF) == 0) { -#if defined(_WIN32) - SleepEx(50, FALSE); -#else - sched_yield(); -#endif + for (;;) { + if (!mlock_.exchange(true, std::memory_order_acquire)) { + break; + } + constexpr int kMaxLoop = 32; + for (int loop = 1; mlock_.load(std::memory_order_relaxed);) { + if (loop <= kMaxLoop) { + for (int i = 1; i <= loop; ++i) { + CpuRelax(); + } + loop *= 2; + } else { + std::this_thread::yield(); + } } } } - void unlock() { mlock_.store(false); } + void unlock() { mlock_.store(false, std::memory_order_release); } + DISABLE_COPY_AND_ASSIGN(SpinLock); private: