提交 1bb6ccd7 编写于 作者: A Alexey Milovidov

Better [#METR-22410].

上级 9efc240f
......@@ -602,6 +602,7 @@ add_library (dbms
src/Common/getNumberOfPhysicalCPUCores.cpp
src/Common/StringUtils.cpp
src/Common/randomSeed.cpp
src/Common/ThreadPool.cpp
src/Core/Field.cpp
src/Core/FieldVisitors.cpp
......
......@@ -20,59 +20,27 @@ private:
using Job = std::function<void()>;
public:
ThreadPool(size_t m_size)
: m_size(m_size)
{
threads.reserve(m_size);
for (size_t i = 0; i < m_size; ++i)
threads.emplace_back([this] { worker(); });
}
void schedule(Job job)
{
{
std::unique_lock<std::mutex> lock(mutex);
has_free_thread.wait(lock, [this] { return active_jobs < m_size || shutdown; });
if (shutdown)
return;
jobs.push(std::move(job));
++active_jobs;
}
has_new_job_or_shutdown.notify_one();
}
void wait()
{
{
std::unique_lock<std::mutex> lock(mutex);
has_free_thread.wait(lock, [this] { return active_jobs == 0 || shutdown; });
if (!exceptions.empty())
std::rethrow_exception(exceptions.front());
}
}
~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(mutex);
shutdown = true;
}
has_new_job_or_shutdown.notify_all();
for (auto & thread : threads)
thread.join();
}
/// Size is constant, all threads are created immediately.
ThreadPool(size_t m_size);
/// Add new job. Locks until free thread in pool become available or exception in one of threads was thrown.
/// If an exception in some thread was thrown, method silently returns, and exception will be rethrown only on call to 'wait' function.
void schedule(Job job);
/// Wait for all currently active jobs to be done.
/// You may call schedule and wait many times in arbitary order.
/// If any thread was throw an exception, it will be rethrown from this method.
/// List of exceptions is not cleared: on subsequent calls to wait, same exception will be rethrown.
void wait();
/// Waits for all threads. Doesn't rethrow exceptions (use 'wait' method to rethrow exceptions).
/// You should not destroy object while calling schedule or wait methods from another threads.
~ThreadPool();
size_t size() const { return m_size; }
size_t active() const
{
std::unique_lock<std::mutex> lock(mutex);
return active_jobs;
}
/// Returns number of active jobs.
size_t active() const;
private:
mutable std::mutex mutex;
......@@ -85,54 +53,9 @@ private:
std::queue<Job> jobs;
std::vector<std::thread> threads;
std::vector<std::exception_ptr> exceptions; /// NOTE Saving many exceptions but rethrow just first one.
void worker()
{
while (true)
{
Job job;
{
std::unique_lock<std::mutex> lock(mutex);
has_new_job_or_shutdown.wait(lock, [this] { return shutdown || !jobs.empty(); });
if (!shutdown)
{
job = std::move(jobs.front());
jobs.pop();
}
else
{
return;
}
}
try
{
job();
}
catch (...)
{
{
std::unique_lock<std::mutex> lock(mutex);
exceptions.push_back(std::current_exception());
shutdown = true;
--active_jobs;
}
has_free_thread.notify_one();
has_new_job_or_shutdown.notify_all();
return;
}
{
std::unique_lock<std::mutex> lock(mutex);
--active_jobs;
}
has_free_thread.notify_one();
}
}
std::vector<std::exception_ptr> exceptions; /// NOTE Saving many exceptions but we rethrow just first one.
void worker();
};
#include <DB/Common/ThreadPool.h>
ThreadPool::ThreadPool(size_t m_size)
: m_size(m_size)
{
threads.reserve(m_size);
for (size_t i = 0; i < m_size; ++i)
threads.emplace_back([this] { worker(); });
}
void ThreadPool::schedule(Job job)
{
{
std::unique_lock<std::mutex> lock(mutex);
has_free_thread.wait(lock, [this] { return active_jobs < m_size || shutdown; });
if (shutdown)
return;
jobs.push(std::move(job));
++active_jobs;
}
has_new_job_or_shutdown.notify_one();
}
void ThreadPool::wait()
{
{
std::unique_lock<std::mutex> lock(mutex);
has_free_thread.wait(lock, [this] { return active_jobs == 0 || shutdown; });
if (!exceptions.empty())
std::rethrow_exception(exceptions.front());
}
}
ThreadPool::~ThreadPool()
{
{
std::unique_lock<std::mutex> lock(mutex);
shutdown = true;
}
has_new_job_or_shutdown.notify_all();
for (auto & thread : threads)
thread.join();
}
size_t ThreadPool::active() const
{
std::unique_lock<std::mutex> lock(mutex);
return active_jobs;
}
void ThreadPool::worker()
{
while (true)
{
Job job;
{
std::unique_lock<std::mutex> lock(mutex);
has_new_job_or_shutdown.wait(lock, [this] { return shutdown || !jobs.empty(); });
if (!shutdown)
{
job = std::move(jobs.front());
jobs.pop();
}
else
{
return;
}
}
try
{
job();
}
catch (...)
{
{
std::unique_lock<std::mutex> lock(mutex);
exceptions.push_back(std::current_exception());
shutdown = true;
--active_jobs;
}
has_free_thread.notify_one();
has_new_job_or_shutdown.notify_all();
return;
}
{
std::unique_lock<std::mutex> lock(mutex);
--active_jobs;
}
has_free_thread.notify_one();
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册