提交 467f9ce2 编写于 作者: A Alexey Milovidov

Fixed ThreadPool when there are too many inactive threads #4485

上级 c26657ce
......@@ -188,6 +188,7 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
if (threads.size() > scheduled_jobs + max_free_threads)
{
thread_it->detach();
threads.erase(thread_it);
job_finished.notify_all();
return;
......
......@@ -134,10 +134,11 @@ public:
template <typename Function, typename... Args>
explicit ThreadFromGlobalPool(Function && func, Args &&... args)
{
mutex = std::make_unique<std::mutex>();
mutex = std::make_shared<std::mutex>();
/// The function object must be copyable, so we wrap lock_guard in shared_ptr.
GlobalThreadPool::instance().scheduleOrThrow([
mutex = mutex,
lock = std::make_shared<std::lock_guard<std::mutex>>(*mutex),
func = std::forward<Function>(func),
args = std::make_tuple(std::forward<Args>(args)...)]
......@@ -154,7 +155,7 @@ public:
ThreadFromGlobalPool & operator=(ThreadFromGlobalPool && rhs)
{
if (mutex)
if (joinable())
std::terminate();
mutex = std::move(rhs.mutex);
return *this;
......@@ -162,25 +163,34 @@ public:
~ThreadFromGlobalPool()
{
if (mutex)
if (joinable())
std::terminate();
}
void join()
{
if (!joinable())
std::terminate();
{
std::lock_guard lock(*mutex);
}
mutex.reset();
}
void detach()
{
if (!joinable())
std::terminate();
mutex.reset();
}
bool joinable() const
{
return static_cast<bool>(mutex);
}
private:
std::unique_ptr<std::mutex> mutex; /// Object must be moveable.
std::shared_ptr<std::mutex> mutex; /// Object must be moveable.
};
......
......@@ -56,6 +56,9 @@ target_link_libraries (thread_pool PRIVATE clickhouse_common_io)
add_executable (thread_pool_2 thread_pool_2.cpp)
target_link_libraries (thread_pool_2 PRIVATE clickhouse_common_io)
add_executable (thread_pool_3 thread_pool_3.cpp)
target_link_libraries (thread_pool_3 PRIVATE clickhouse_common_io)
add_executable (multi_version multi_version.cpp)
target_link_libraries (multi_version PRIVATE clickhouse_common_io)
add_check(multi_version)
......
#include <atomic>
#include <iostream>
#include <Common/ThreadPool.h>
/// Test for thread self-removal when number of free threads in pool is too large.
/// Just checks that nothing weird happens.
template <typename Pool>
void test()
{
Pool pool(10, 2, 10);
for (size_t i = 0; i < 10; ++i)
pool.schedule([]{ std::cerr << '.'; });
pool.wait();
}
int main(int, char **)
{
test<FreeThreadPool>();
std::cerr << '\n';
test<ThreadPool>();
std::cerr << '\n';
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册