From f581f5bf69b929712c7df8513c399beda114005e Mon Sep 17 00:00:00 2001 From: Leo Chen Date: Sat, 9 Apr 2022 09:46:11 +0800 Subject: [PATCH] [new-exec] fix bug that no thread is waked up when adding task to threadpool (#41567) * fix bug that no thread is waked up when adding task to threadpool * fix typo --- .../new_executor/interpretercore_util.cc | 1 + .../new_executor/workqueue/event_count.h | 7 ++++++- .../workqueue/nonblocking_threadpool.h | 18 ++++++++++++------ 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/paddle/fluid/framework/new_executor/interpretercore_util.cc b/paddle/fluid/framework/new_executor/interpretercore_util.cc index a704411f3bb..59703332efe 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_util.cc +++ b/paddle/fluid/framework/new_executor/interpretercore_util.cc @@ -39,6 +39,7 @@ constexpr size_t kPrepareWorkQueueIdx = 2; void AsyncWorkQueue::AddTask(const OpFuncType& op_func_type, std::function fn) { + VLOG(4) << "Add task: " << static_cast(op_func_type) << " "; // NOTE(zhiqiu): use thhe second queue of size of, so only one thread is used. if (FLAGS_new_executor_sequential_run) { VLOG(4) << "FLAGS_new_executor_sequential_run:" diff --git a/paddle/fluid/framework/new_executor/workqueue/event_count.h b/paddle/fluid/framework/new_executor/workqueue/event_count.h index 893c6d2d54a..7a826c39907 100644 --- a/paddle/fluid/framework/new_executor/workqueue/event_count.h +++ b/paddle/fluid/framework/new_executor/workqueue/event_count.h @@ -54,6 +54,7 @@ #include #include #include +#include "glog/logging.h" namespace paddle { namespace framework { @@ -255,6 +256,7 @@ class EventCount { std::unique_lock lock(w->mu); while (w->state != Waiter::kSignaled) { w->state = Waiter::kWaiting; + VLOG(10) << "Go to wait " << &(w->cv); w->cv.wait(lock); } } @@ -270,7 +272,10 @@ class EventCount { w->state = Waiter::kSignaled; } // Avoid notifying if it wasn't waiting. - if (state == Waiter::kWaiting) w->cv.notify_one(); + if (state == Waiter::kWaiting) { + VLOG(10) << "Go to notify " << &(w->cv); + w->cv.notify_one(); + } } } }; diff --git a/paddle/fluid/framework/new_executor/workqueue/nonblocking_threadpool.h b/paddle/fluid/framework/new_executor/workqueue/nonblocking_threadpool.h index 384498584c6..44953fa192e 100644 --- a/paddle/fluid/framework/new_executor/workqueue/nonblocking_threadpool.h +++ b/paddle/fluid/framework/new_executor/workqueue/nonblocking_threadpool.h @@ -53,7 +53,6 @@ class ThreadPoolTempl { all_coprimes_.reserve(num_threads_); for (int i = 1; i <= num_threads_; ++i) { all_coprimes_.emplace_back(); - all_coprimes_.back().push_back(i); ComputeCoprimes(i, &(all_coprimes_.back())); } for (int i = 0; i < num_threads_; i++) { @@ -130,8 +129,11 @@ class ThreadPoolTempl { // this. We expect that such scenario is prevented by program, that is, // this is kept alive while any threads can potentially be in Schedule. if (!t.f) { - if (num_tasks > num_threads_ - blocked_.load(std::memory_order_relaxed)) { + if (num_tasks > num_threads_ - blocked_) { + VLOG(6) << "Add task, Notify"; ec_.Notify(false); + } else { + VLOG(6) << "Add task, No Notify"; } } else { num_tasks_.fetch_sub(1, std::memory_order_relaxed); @@ -376,17 +378,21 @@ class ThreadPoolTempl { ec_.CancelWait(); return false; } + + // Number of blocked threads is used as termination condition. + // If we are shutting down and all worker threads blocked without work, + // that's we are done. + blocked_++; + // Now do a reliable emptiness check. int victim = NonEmptyQueueIndex(); if (victim != -1) { ec_.CancelWait(); *t = thread_data_[victim].queue.PopBack(); + blocked_--; return true; } - // Number of blocked threads is used as termination condition. - // If we are shutting down and all worker threads blocked without work, - // that's we are done. - blocked_++; + if (done_ && blocked_ == static_cast(num_threads_)) { ec_.CancelWait(); // Almost done, but need to re-check queues. -- GitLab