未验证 提交 b1486015 编写于 作者: A alexey-milovidov 提交者: GitHub

Merge pull request #6301 from yandex/fix-thread-pool-hang-if-full

Fixed the possibility of hanging queries when server is overloaded
#include <Common/ThreadPool.h>
#include <Common/Exception.h>
#include <iostream>
#include <type_traits>
......@@ -34,6 +33,28 @@ ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads, size_t max_free_threa
{
}
template <typename Thread>
void ThreadPoolImpl<Thread>::setMaxThreads(size_t value)
{
std::lock_guard lock(mutex);
max_threads = value;
}
template <typename Thread>
void ThreadPoolImpl<Thread>::setMaxFreeThreads(size_t value)
{
std::lock_guard lock(mutex);
max_free_threads = value;
}
template <typename Thread>
void ThreadPoolImpl<Thread>::setQueueSize(size_t value)
{
std::lock_guard lock(mutex);
queue_size = value;
}
template <typename Thread>
template <typename ReturnType>
ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::optional<uint64_t> wait_microseconds)
......@@ -59,7 +80,7 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::opti
auto pred = [this] { return !queue_size || scheduled_jobs < queue_size || shutdown; };
if (wait_microseconds)
if (wait_microseconds) /// Check for optional. Condition is true if the optional is set and the value is zero.
{
if (!job_finished.wait_for(lock, std::chrono::microseconds(*wait_microseconds), pred))
return on_error();
......@@ -83,6 +104,15 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::opti
catch (...)
{
threads.pop_front();
/// Remove the job and return error to caller.
/// Note that if we have allocated at least one thread, we may continue
/// (one thread is enough to process all jobs).
/// But this condition indicate an error nevertheless and better to refuse.
jobs.pop();
--scheduled_jobs;
return on_error();
}
}
}
......
......@@ -60,14 +60,18 @@ public:
/// Returns number of running and scheduled jobs.
size_t active() const;
void setMaxThreads(size_t value);
void setMaxFreeThreads(size_t value);
void setQueueSize(size_t value);
private:
mutable std::mutex mutex;
std::condition_variable job_finished;
std::condition_variable new_job_or_shutdown;
const size_t max_threads;
const size_t max_free_threads;
const size_t queue_size;
size_t max_threads;
size_t max_free_threads;
size_t queue_size;
size_t scheduled_jobs = 0;
bool shutdown = false;
......
#include <atomic>
#include <Common/ThreadPool.h>
#include <gtest/gtest.h>
/// Test what happens if local ThreadPool cannot create a ThreadFromGlobalPool.
/// There was a bug: if local ThreadPool cannot allocate even a single thread,
/// the job will be scheduled but never get executed.
TEST(ThreadPool, GlobalFull1)
{
GlobalThreadPool & global_pool = GlobalThreadPool::instance();
static constexpr size_t capacity = 5;
global_pool.setMaxThreads(capacity);
global_pool.setMaxFreeThreads(1);
global_pool.setQueueSize(capacity);
global_pool.wait();
std::atomic<size_t> counter = 0;
static constexpr size_t num_jobs = capacity + 1;
auto func = [&] { ++counter; while (counter != num_jobs) {} };
ThreadPool pool(num_jobs);
for (size_t i = 0; i < capacity; ++i)
pool.schedule(func);
for (size_t i = capacity; i < num_jobs; ++i)
{
EXPECT_THROW(pool.schedule(func), DB::Exception);
++counter;
}
pool.wait();
EXPECT_EQ(counter, num_jobs);
global_pool.setMaxThreads(10000);
global_pool.setMaxFreeThreads(1000);
global_pool.setQueueSize(10000);
}
TEST(ThreadPool, GlobalFull2)
{
GlobalThreadPool & global_pool = GlobalThreadPool::instance();
static constexpr size_t capacity = 5;
global_pool.setMaxThreads(capacity);
global_pool.setMaxFreeThreads(1);
global_pool.setQueueSize(capacity);
/// ThreadFromGlobalPool from local thread pools from previous test case have exited
/// but their threads from global_pool may not have finished (they still have to exit).
/// If we will not wait here, we can get "Cannot schedule a task exception" earlier than we expect in this test.
global_pool.wait();
std::atomic<size_t> counter = 0;
auto func = [&] { ++counter; while (counter != capacity + 1) {} };
ThreadPool pool(capacity, 0, capacity);
for (size_t i = 0; i < capacity; ++i)
pool.schedule(func);
ThreadPool another_pool(1);
EXPECT_THROW(another_pool.schedule(func), DB::Exception);
++counter;
pool.wait();
global_pool.wait();
for (size_t i = 0; i < capacity; ++i)
another_pool.schedule([&] { ++counter; });
another_pool.wait();
EXPECT_EQ(counter, capacity * 2 + 1);
global_pool.setMaxThreads(10000);
global_pool.setMaxFreeThreads(1000);
global_pool.setQueueSize(10000);
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册