From e95831662e66f59845868437a9f19076fba19456 Mon Sep 17 00:00:00 2001 From: seemingwang Date: Thu, 29 Jul 2021 12:35:20 +0800 Subject: [PATCH] fix block_queue problem (#34461) --- .../fluid/distributed/service/communicator.h | 70 +++++++++++++------ 1 file changed, 50 insertions(+), 20 deletions(-) diff --git a/paddle/fluid/distributed/service/communicator.h b/paddle/fluid/distributed/service/communicator.h index fa60cab2b5..9caec79e58 100644 --- a/paddle/fluid/distributed/service/communicator.h +++ b/paddle/fluid/distributed/service/communicator.h @@ -61,38 +61,65 @@ using Variable = framework::Variable; template class BlockingQueue { public: - explicit BlockingQueue(size_t capacity) : capacity_(capacity) { - PADDLE_ENFORCE_GT(capacity_, 0, - platform::errors::InvalidArgument( - "The capacity must be greater than 0.")); - } + explicit BlockingQueue(size_t capacity) : capacity_(capacity) {} bool Push(const T &elem) { - { - std::unique_lock lock(mutex_); - cv_.wait(lock, [&] { return queue_.size() < capacity_; }); - queue_.push_back(elem); + std::unique_lock lock(mutex_); + WaitForWrite(lock); + + queue_.push_back(elem); + + Notify(); + return true; + } + bool WaitForWrite(std::unique_lock &lock) { // NOLINT + while (FullUnlocked()) { + if (empty_waiters_ != 0) { + empty_cond_.notify_one(); + } + full_waiters_++; + full_cond_.wait(lock); + full_waiters_--; } - cv_.notify_one(); return true; } - - bool Push(T &&elem) { - { - std::unique_lock lock(mutex_); - cv_.wait(lock, [&] { return queue_.size() < capacity_; }); - queue_.emplace_back(std::move(elem)); + bool WaitForRead(std::unique_lock &lock) { // NOLINT + while (EmptyUnlocked()) { + if (full_waiters_ != 0) { + full_cond_.notify_one(); + } + empty_waiters_++; + empty_cond_.wait(lock); + empty_waiters_--; } - cv_.notify_one(); return true; } + bool EmptyUnlocked() { return queue_.empty(); } + bool FullUnlocked() { return queue_.size() >= capacity_; } + void Notify() { + if (empty_waiters_ != 0 && (!EmptyUnlocked())) { + empty_cond_.notify_one(); + } + if (full_waiters_ != 0 && (!FullUnlocked())) { + full_cond_.notify_one(); + } + } + + bool Push(T &&elem) { + std::unique_lock lock(mutex_); + WaitForWrite(lock); + queue_.emplace_back(std::move(elem)); + + Notify(); + return true; + } T Pop() { std::unique_lock lock(mutex_); - cv_.wait(lock, [=] { return !queue_.empty(); }); + WaitForRead(lock); T rc(std::move(queue_.front())); queue_.pop_front(); - cv_.notify_one(); + Notify(); return rc; } @@ -107,11 +134,14 @@ class BlockingQueue { } private: + int empty_waiters_ = 0; + int full_waiters_ = 0; + std::condition_variable empty_cond_; + std::condition_variable full_cond_; const size_t capacity_; std::deque queue_; mutable std::mutex mutex_; - std::condition_variable cv_; }; template