未验证 提交 6449faec 编写于 作者: Q Qiao Longfei 提交者: GitHub

Merge pull request #14259 from jacquesqiao/optimize-thread-pool

Optimize thread pool
......@@ -57,10 +57,10 @@ ThreadPool::ThreadPool(int num_threads) : running_(true) {
ThreadPool::~ThreadPool() {
{
// notify all threads to stop running
std::lock_guard<std::mutex> l(mutex_);
std::unique_lock<std::mutex> l(mutex_);
running_ = false;
scheduled_.notify_all();
}
scheduled_.notify_all();
for (auto& t : threads_) {
t->join();
......@@ -70,19 +70,25 @@ ThreadPool::~ThreadPool() {
void ThreadPool::TaskLoop() {
while (true) {
std::unique_lock<std::mutex> lock(mutex_);
Task task;
scheduled_.wait(
lock, [this] { return !this->tasks_.empty() || !this->running_; });
{
std::unique_lock<std::mutex> lock(mutex_);
scheduled_.wait(
lock, [this] { return !this->tasks_.empty() || !this->running_; });
if (!running_ || tasks_.empty()) {
return;
}
if (!running_ && tasks_.empty()) {
return;
}
if (tasks_.empty()) {
PADDLE_THROW("This thread has no task to Run");
}
// pop a task from the task queue
auto task = std::move(tasks_.front());
tasks_.pop();
lock.unlock();
// pop a task from the task queue
task = std::move(tasks_.front());
tasks_.pop();
}
// run the task
task();
......
......@@ -58,7 +58,7 @@ class ThreadPool {
~ThreadPool();
// Run pushes a function to the task queue and returns a std::future
// object. To wait for the completion of the task, call
// object. To wait for the completion of the task, call
// std::future::wait().
template <typename Callback>
std::future<void> Run(Callback fn) {
......@@ -69,7 +69,6 @@ class ThreadPool {
template <typename Callback>
std::future<std::unique_ptr<platform::EnforceNotMet>> RunAndGetException(
Callback fn) {
std::unique_lock<std::mutex> lock(mutex_);
Task task([fn]() -> std::unique_ptr<platform::EnforceNotMet> {
try {
fn();
......@@ -84,7 +83,13 @@ class ThreadPool {
return nullptr;
});
std::future<std::unique_ptr<platform::EnforceNotMet>> f = task.get_future();
tasks_.push(std::move(task));
{
std::unique_lock<std::mutex> lock(mutex_);
if (!running_) {
PADDLE_THROW("enqueue on stopped ThreadPool");
}
tasks_.push(std::move(task));
}
scheduled_.notify_one();
return f;
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册