提交 a6bf5a7e 编写于 作者: A Alexey Milovidov

Addition to prev. revision

上级 0afee1df
#pragma once
#include <mutex>
#include <memory>
/** Allow to store and read-only usage of an object in several threads,
* and to atomically replace an object in another thread.
* The replacement is atomic and reading threads can work with different versions of an object.
*
* Usage:
* MultiVersion<T> x;
* - on data update:
* x.set(new value);
* - on read-only usage:
* {
* MultiVersion<T>::Version current_version = x.get();
* // use *current_version
* } // now we finish own current version; if the version is outdated and no one else is using it - it will be destroyed.
*
* All methods are thread-safe.
*/
template <typename T>
class MultiVersion
{
public:
/// Version of object for usage. shared_ptr manage lifetime of version.
using Version = std::shared_ptr<const T>;
/// Default initialization - by nullptr.
MultiVersion() = default;
MultiVersion(std::unique_ptr<const T> && value)
{
set(std::move(value));
}
/// Obtain current version for read-only usage. Returns shared_ptr, that manages lifetime of version.
Version get() const
{
/// NOTE: is it possible to lock-free replace of shared_ptr?
std::lock_guard lock(mutex);
return current_version;
}
/// Update an object with new version.
void set(std::unique_ptr<const T> && value)
{
std::lock_guard lock(mutex);
current_version = std::move(value);
}
private:
Version current_version;
mutable std::mutex mutex;
};
#include <Common/ThreadPool.h>
template <typename Thread>
ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t num_threads)
: ThreadPoolImpl(num_threads, num_threads)
{
}
template <typename Thread>
ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t num_threads, size_t queue_size)
: num_threads(num_threads), queue_size(queue_size)
{
threads.reserve(num_threads);
}
template <typename Thread>
void ThreadPoolImpl<Thread>::schedule(Job job, int priority)
{
{
std::unique_lock<std::mutex> lock(mutex);
job_finished.wait(lock, [this] { return !queue_size || active_jobs < queue_size || shutdown; });
if (shutdown)
return;
jobs.emplace(std::move(job), priority);
++active_jobs;
if (threads.size() < std::min(num_threads, active_jobs))
threads.emplace_back([this] { worker(); });
}
new_job_or_shutdown.notify_one();
}
template <typename Thread>
void ThreadPoolImpl<Thread>::wait()
{
{
std::unique_lock<std::mutex> lock(mutex);
job_finished.wait(lock, [this] { return active_jobs == 0; });
if (first_exception)
{
std::exception_ptr exception;
std::swap(exception, first_exception);
std::rethrow_exception(exception);
}
}
}
template <typename Thread>
ThreadPoolImpl<Thread>::~ThreadPoolImpl()
{
finalize();
}
template <typename Thread>
void ThreadPoolImpl<Thread>::finalize()
{
{
std::unique_lock<std::mutex> lock(mutex);
shutdown = true;
}
new_job_or_shutdown.notify_all();
for (auto & thread : threads)
thread.join();
threads.clear();
}
template <typename Thread>
size_t ThreadPoolImpl<Thread>::active() const
{
std::unique_lock<std::mutex> lock(mutex);
return active_jobs;
}
template <typename Thread>
void ThreadPoolImpl<Thread>::worker()
{
while (true)
{
Job job;
bool need_shutdown = false;
{
std::unique_lock<std::mutex> lock(mutex);
new_job_or_shutdown.wait(lock, [this] { return shutdown || !jobs.empty(); });
need_shutdown = shutdown;
if (!jobs.empty())
{
job = jobs.top().job;
jobs.pop();
}
else
{
return;
}
}
if (!need_shutdown)
{
try
{
job();
}
catch (...)
{
{
std::unique_lock<std::mutex> lock(mutex);
if (!first_exception)
first_exception = std::current_exception();
shutdown = true;
--active_jobs;
}
job_finished.notify_all();
new_job_or_shutdown.notify_all();
return;
}
}
{
std::unique_lock<std::mutex> lock(mutex);
--active_jobs;
}
job_finished.notify_all();
}
}
template class ThreadPoolImpl<std::thread>;
template class ThreadPoolImpl<ThreadFromGlobalPool>;
void ExceptionHandler::setException(std::exception_ptr && exception)
{
std::unique_lock<std::mutex> lock(mutex);
if (!first_exception)
first_exception = std::move(exception);
}
void ExceptionHandler::throwIfException()
{
std::unique_lock<std::mutex> lock(mutex);
if (first_exception)
std::rethrow_exception(first_exception);
}
ThreadPool::Job createExceptionHandledJob(ThreadPool::Job job, ExceptionHandler & handler)
{
return [job{std::move(job)}, &handler] ()
{
try
{
job();
}
catch (...)
{
handler.setException(std::current_exception());
}
};
}
#pragma once
#include <cstdint>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <queue>
#include <vector>
#include <ext/singleton.h>
/** Very simple thread pool similar to boost::threadpool.
* Advantages:
* - catches exceptions and rethrows on wait.
*/
template <typename Thread>
class ThreadPoolImpl
{
public:
using Job = std::function<void()>;
/// Size is constant. Up to num_threads are created on demand and then run until shutdown.
explicit ThreadPoolImpl(size_t num_threads);
/// queue_size - maximum number of running plus scheduled jobs. It can be greater than num_threads. Zero means unlimited.
ThreadPoolImpl(size_t num_threads, size_t queue_size);
/// Add new job. Locks until number of active jobs is less than maximum or exception in one of threads was thrown.
/// If an exception in some thread was thrown, method silently returns, and exception will be rethrown only on call to 'wait' function.
/// Priority: greater is higher.
void schedule(Job job, int priority = 0);
/// Wait for all currently active jobs to be done.
/// You may call schedule and wait many times in arbitary order.
/// If any thread was throw an exception, first exception will be rethrown from this method,
/// and exception will be cleared.
void wait();
/// Waits for all threads. Doesn't rethrow exceptions (use 'wait' method to rethrow exceptions).
/// You should not destroy object while calling schedule or wait methods from another threads.
~ThreadPoolImpl();
size_t size() const { return num_threads; }
/// Returns number of running and scheduled jobs.
size_t active() const;
private:
mutable std::mutex mutex;
std::condition_variable job_finished;
std::condition_variable new_job_or_shutdown;
const size_t num_threads;
const size_t queue_size;
size_t active_jobs = 0;
bool shutdown = false;
struct JobWithPriority
{
Job job;
int priority;
JobWithPriority(Job job, int priority)
: job(job), priority(priority) {}
bool operator< (const JobWithPriority & rhs) const
{
return priority < rhs.priority;
}
};
std::priority_queue<JobWithPriority> jobs;
std::vector<Thread> threads;
std::exception_ptr first_exception;
void worker();
void finalize();
};
using FreeThreadPool = ThreadPoolImpl<std::thread>;
class GlobalThreadPool : public FreeThreadPool, public ext::singleton<GlobalThreadPool>
{
public:
GlobalThreadPool() : FreeThreadPool(10000) {} /// TODO: global blocking limit may lead to deadlocks.
};
class ThreadFromGlobalPool
{
public:
ThreadFromGlobalPool(std::function<void()> func)
{
GlobalThreadPool::instance().schedule(func);
}
void join()
{
/// noop, the std::thread will continue to run inside global pool.
}
};
using ThreadPool = ThreadPoolImpl<ThreadFromGlobalPool>;
/// Allows to save first catched exception in jobs and postpone its rethrow.
class ExceptionHandler
{
public:
void setException(std::exception_ptr && exception);
void throwIfException();
private:
std::exception_ptr first_exception;
std::mutex mutex;
};
ThreadPool::Job createExceptionHandledJob(ThreadPool::Job job, ExceptionHandler & handler);
......@@ -58,6 +58,7 @@ target_link_libraries (thread_pool_2 PRIVATE clickhouse_common_io)
add_executable (multi_version multi_version.cpp)
target_link_libraries (multi_version PRIVATE clickhouse_common_io)
add_check(multi_version)
add_executable (array_cache array_cache.cpp)
target_link_libraries (array_cache PRIVATE clickhouse_common_io)
......
#include <string.h>
#include <iostream>
#include <functional>
#include <Common/ThreadPool.h>
#include <Common/MultiVersion.h>
#include <Poco/Exception.h>
using T = std::string;
using MV = MultiVersion<T>;
using Results = std::vector<T>;
void thread1(MV & x, T & result)
{
MV::Version v = x.get();
result = *v;
}
void thread2(MV & x, const char * result)
{
x.set(std::make_unique<T>(result));
}
int main(int argc, char ** argv)
{
try
{
const char * s1 = "Hello!";
const char * s2 = "Goodbye!";
size_t n = 1000;
MV x(std::make_unique<T>(s1));
Results results(n);
ThreadPool tp(8);
for (size_t i = 0; i < n; ++i)
{
tp.schedule(std::bind(thread1, std::ref(x), std::ref(results[i])));
tp.schedule(std::bind(thread2, std::ref(x), (rand() % 2) ? s1 : s2));
}
tp.wait();
for (size_t i = 0; i < n; ++i)
std::cerr << results[i] << " ";
std::cerr << std::endl;
}
catch (const Poco::Exception & e)
{
std::cerr << e.message() << std::endl;
throw;
}
return 0;
}
......@@ -17,7 +17,6 @@ target_link_libraries (date_lut4 common ${PLATFORM_LIBS})
target_link_libraries (date_lut_default_timezone common ${PLATFORM_LIBS})
target_link_libraries (local_date_time_comparison common)
target_link_libraries (realloc-perf common)
add_check(multi_version)
add_check(local_date_time_comparison)
add_executable (unit_tests_libcommon gtest_json_test.cpp gtest_strong_typedef.cpp gtest_find_symbols.cpp)
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册