diff --git a/paddle/fluid/framework/new_executor/interpretercore_util.cc b/paddle/fluid/framework/new_executor/interpretercore_util.cc index a704411f3bb713421dc23903112eacd0de363b57..59703332efe9594c3f2130eeff79bea6c690839e 100644 --- a/paddle/fluid/framework/new_executor/interpretercore_util.cc +++ b/paddle/fluid/framework/new_executor/interpretercore_util.cc @@ -39,6 +39,7 @@ constexpr size_t kPrepareWorkQueueIdx = 2; void AsyncWorkQueue::AddTask(const OpFuncType& op_func_type, std::function fn) { + VLOG(4) << "Add task: " << static_cast(op_func_type) << " "; // NOTE(zhiqiu): use thhe second queue of size of, so only one thread is used. if (FLAGS_new_executor_sequential_run) { VLOG(4) << "FLAGS_new_executor_sequential_run:" diff --git a/paddle/fluid/framework/new_executor/workqueue/event_count.h b/paddle/fluid/framework/new_executor/workqueue/event_count.h index 893c6d2d54ac7209d284a909ae845ba14c2d1cf5..7a826c3990713d272e85c688ec20134cf4ba9ac3 100644 --- a/paddle/fluid/framework/new_executor/workqueue/event_count.h +++ b/paddle/fluid/framework/new_executor/workqueue/event_count.h @@ -54,6 +54,7 @@ #include #include #include +#include "glog/logging.h" namespace paddle { namespace framework { @@ -255,6 +256,7 @@ class EventCount { std::unique_lock lock(w->mu); while (w->state != Waiter::kSignaled) { w->state = Waiter::kWaiting; + VLOG(10) << "Go to wait " << &(w->cv); w->cv.wait(lock); } } @@ -270,7 +272,10 @@ class EventCount { w->state = Waiter::kSignaled; } // Avoid notifying if it wasn't waiting. - if (state == Waiter::kWaiting) w->cv.notify_one(); + if (state == Waiter::kWaiting) { + VLOG(10) << "Go to notify " << &(w->cv); + w->cv.notify_one(); + } } } }; diff --git a/paddle/fluid/framework/new_executor/workqueue/nonblocking_threadpool.h b/paddle/fluid/framework/new_executor/workqueue/nonblocking_threadpool.h index bc65231abe7371a931f709c9190b55fde24f0543..ad5322f8d705e961934aa08242f1f65975c4f3f9 100644 --- a/paddle/fluid/framework/new_executor/workqueue/nonblocking_threadpool.h +++ b/paddle/fluid/framework/new_executor/workqueue/nonblocking_threadpool.h @@ -53,7 +53,6 @@ class ThreadPoolTempl { all_coprimes_.reserve(num_threads_); for (int i = 1; i <= num_threads_; ++i) { all_coprimes_.emplace_back(); - all_coprimes_.back().push_back(i); ComputeCoprimes(i, &(all_coprimes_.back())); } for (int i = 0; i < num_threads_; i++) { @@ -130,8 +129,11 @@ class ThreadPoolTempl { // this. We expect that such scenario is prevented by program, that is, // this is kept alive while any threads can potentially be in Schedule. if (!t.f) { - if (num_tasks > num_threads_ - blocked_.load(std::memory_order_relaxed)) { + if (num_tasks > num_threads_ - blocked_) { + VLOG(6) << "Add task, Notify"; ec_.Notify(false); + } else { + VLOG(6) << "Add task, No Notify"; } } else { num_tasks_.fetch_sub(1, std::memory_order_relaxed); @@ -376,17 +378,21 @@ class ThreadPoolTempl { ec_.CancelWait(); return false; } + + // Number of blocked threads is used as termination condition. + // If we are shutting down and all worker threads blocked without work, + // that's we are done. + blocked_++; + // Now do a reliable emptiness check. int victim = NonEmptyQueueIndex(); if (victim != -1) { ec_.CancelWait(); *t = thread_data_[victim].queue.PopBack(); + blocked_--; return true; } - // Number of blocked threads is used as termination condition. - // If we are shutting down and all worker threads blocked without work, - // that's we are done. - blocked_++; + if (done_ && blocked_ == static_cast(num_threads_)) { ec_.CancelWait(); // Almost done, but need to re-check queues.