提交 3987b097 编写于 作者: J Jakob Progsch

reorganised files

上级 3c5324ce
ThreadPool ThreadPool
========== ==========
A simple C++11 Thread Pool implementation A simple C++11 Thread Pool implementation.
\ No newline at end of file
The "legacy" directory contains a version that does not use std::future
but a custom Result<T> type that has essentially the same functionalty.
...@@ -7,106 +7,50 @@ ...@@ -7,106 +7,50 @@
#include <thread> #include <thread>
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
#include <future>
class ThreadPool; // need this type to "erase" the return type of the packaged task
struct any_packaged_base {
// our worker thread objects virtual void execute() = 0;
class Worker {
public:
Worker(ThreadPool &s) : pool(s) { }
void operator()();
private:
ThreadPool &pool;
}; };
template<class T> template<class R>
class Result { struct any_packaged : public any_packaged_base {
struct ResultImpl { any_packaged(std::packaged_task<R()> &&t)
ResultImpl() : value(T()), available(false) { } : task(std::move(t))
T value;
bool available;
std::mutex lock;
std::condition_variable cond;
};
public:
Result() : impl(new ResultImpl()) { }
bool available() const
{ {
std::unique_lock<std::mutex> ul(impl->lock);
return impl->available;
} }
void wait() void execute()
{ {
if(!impl) task();
return;
std::unique_lock<std::mutex> ul(impl->lock);
if(impl->available)
return;
impl->cond.wait(ul);
}
void signal() const
{
std::unique_lock<std::mutex> ul(impl->lock);
impl->available = true; impl->cond.notify_all();
}
bool valid() const
{
std::unique_lock<std::mutex> ul(impl->lock);
return static_cast<bool>(impl);
} }
std::packaged_task<R()> task;
};
T& get() class any_packaged_task {
{ public:
wait(); template<class R>
return impl->value; any_packaged_task(std::packaged_task<R()> &&task)
: ptr(new any_packaged<R>(std::move(task)))
{
} }
void set(T v) const void operator()()
{ {
std::unique_lock<std::mutex> ul(impl->lock); ptr->execute();
impl->value = v;
} }
private: private:
std::shared_ptr<ResultImpl> impl; std::shared_ptr<any_packaged_base> ptr;
}; };
template<> class ThreadPool;
class Result<void> {
struct ResultImpl { // our worker thread objects
ResultImpl() : available(false) { } class Worker {
bool available;
std::mutex lock;
std::condition_variable cond;
};
public: public:
Result() : impl(new ResultImpl()) { } Worker(ThreadPool &s) : pool(s) { }
void operator()();
bool available() const
{
std::unique_lock<std::mutex> ul(impl->lock);
return impl->available;
}
void wait()
{
if(!impl)
return;
std::unique_lock<std::mutex> ul(impl->lock);
if(impl->available)
return;
impl->cond.wait(ul);
}
void signal() const
{
std::unique_lock<std::mutex> ul(impl->lock);
impl->available = true; impl->cond.notify_all();
}
bool valid() const
{
std::unique_lock<std::mutex> ul(impl->lock);
return static_cast<bool>(impl);
}
private: private:
std::shared_ptr<ResultImpl> impl; ThreadPool &pool;
}; };
// the actual thread pool // the actual thread pool
...@@ -114,7 +58,7 @@ class ThreadPool { ...@@ -114,7 +58,7 @@ class ThreadPool {
public: public:
ThreadPool(size_t); ThreadPool(size_t);
template<class T, class F> template<class T, class F>
Result<T> enqueue(F f); std::future<T> enqueue(F f);
~ThreadPool(); ~ThreadPool();
private: private:
friend class Worker; friend class Worker;
...@@ -122,7 +66,7 @@ private: ...@@ -122,7 +66,7 @@ private:
// need to keep track of threads so we can join them // need to keep track of threads so we can join them
std::vector< std::thread > workers; std::vector< std::thread > workers;
// the task queue // the task queue
std::deque< std::function<void()> > tasks; std::deque< any_packaged_task > tasks;
// synchronization // synchronization
std::mutex queue_mutex; std::mutex queue_mutex;
...@@ -132,18 +76,16 @@ private: ...@@ -132,18 +76,16 @@ private:
void Worker::operator()() void Worker::operator()()
{ {
std::function<void()> task;
while(true) while(true)
{ {
{ std::unique_lock<std::mutex> lock(pool.queue_mutex);
std::unique_lock<std::mutex> lock(pool.queue_mutex); while(!pool.stop && pool.tasks.empty())
while(!pool.stop && pool.tasks.empty()) pool.condition.wait(lock);
pool.condition.wait(lock); if(pool.stop)
if(pool.stop) return;
return; any_packaged_task task(pool.tasks.front());
task = pool.tasks.front(); pool.tasks.pop_front();
pool.tasks.pop_front(); lock.unlock();
}
task(); task();
} }
} }
...@@ -156,36 +98,15 @@ ThreadPool::ThreadPool(size_t threads) ...@@ -156,36 +98,15 @@ ThreadPool::ThreadPool(size_t threads)
workers.push_back(std::thread(Worker(*this))); workers.push_back(std::thread(Worker(*this)));
} }
template<class T, class F>
struct CallAndSet {
void operator()(const Result<T> &res, const F f)
{
res.set(f());
res.signal();
}
};
template<class F>
struct CallAndSet<void,F> {
void operator()(const Result<void> &res, const F &f)
{
f();
res.signal();
}
};
// add new work item to the pool // add new work item to the pool
template<class T, class F> template<class T, class F>
Result<T> ThreadPool::enqueue(F f) std::future<T> ThreadPool::enqueue(F f)
{ {
Result<T> res; std::packaged_task<T()> task(f);
std::future<T> res= task.get_future();
{ {
std::unique_lock<std::mutex> lock(queue_mutex); std::unique_lock<std::mutex> lock(queue_mutex);
tasks.push_back(std::function<void()>( tasks.push_back(any_packaged_task(std::move(task)));
[f,res]()
{
CallAndSet<T,F>()(res, f);
}));
} }
condition.notify_one(); condition.notify_one();
return res; return res;
......
...@@ -7,50 +7,106 @@ ...@@ -7,50 +7,106 @@
#include <thread> #include <thread>
#include <mutex> #include <mutex>
#include <condition_variable> #include <condition_variable>
#include <future>
// need this type to "erase" the return type of the packaged task class ThreadPool;
struct any_packaged_base {
virtual void execute() = 0; // our worker thread objects
class Worker {
public:
Worker(ThreadPool &s) : pool(s) { }
void operator()();
private:
ThreadPool &pool;
}; };
template<class R> template<class T>
struct any_packaged : public any_packaged_base { class Result {
any_packaged(std::packaged_task<R()> &&t) struct ResultImpl {
: task(std::move(t)) ResultImpl() : value(T()), available(false) { }
T value;
bool available;
std::mutex lock;
std::condition_variable cond;
};
public:
Result() : impl(new ResultImpl()) { }
bool available() const
{ {
std::unique_lock<std::mutex> ul(impl->lock);
return impl->available;
} }
void execute() void wait()
{ {
task(); if(!impl)
return;
std::unique_lock<std::mutex> ul(impl->lock);
if(impl->available)
return;
impl->cond.wait(ul);
} }
std::packaged_task<R()> task; void signal() const
};
class any_packaged_task {
public:
template<class R>
any_packaged_task(std::packaged_task<R()> &&task)
: ptr(new any_packaged<R>(std::move(task)))
{ {
std::unique_lock<std::mutex> ul(impl->lock);
impl->available = true; impl->cond.notify_all();
}
bool valid() const
{
std::unique_lock<std::mutex> ul(impl->lock);
return static_cast<bool>(impl);
} }
void operator()()
T& get()
{
wait();
return impl->value;
}
void set(T v) const
{ {
ptr->execute(); std::unique_lock<std::mutex> ul(impl->lock);
impl->value = v;
} }
private: private:
std::shared_ptr<any_packaged_base> ptr; std::shared_ptr<ResultImpl> impl;
}; };
class ThreadPool; template<>
class Result<void> {
// our worker thread objects struct ResultImpl {
class Worker { ResultImpl() : available(false) { }
bool available;
std::mutex lock;
std::condition_variable cond;
};
public: public:
Worker(ThreadPool &s) : pool(s) { } Result() : impl(new ResultImpl()) { }
void operator()();
bool available() const
{
std::unique_lock<std::mutex> ul(impl->lock);
return impl->available;
}
void wait()
{
if(!impl)
return;
std::unique_lock<std::mutex> ul(impl->lock);
if(impl->available)
return;
impl->cond.wait(ul);
}
void signal() const
{
std::unique_lock<std::mutex> ul(impl->lock);
impl->available = true; impl->cond.notify_all();
}
bool valid() const
{
std::unique_lock<std::mutex> ul(impl->lock);
return static_cast<bool>(impl);
}
private: private:
ThreadPool &pool; std::shared_ptr<ResultImpl> impl;
}; };
// the actual thread pool // the actual thread pool
...@@ -58,7 +114,7 @@ class ThreadPool { ...@@ -58,7 +114,7 @@ class ThreadPool {
public: public:
ThreadPool(size_t); ThreadPool(size_t);
template<class T, class F> template<class T, class F>
std::future<T> enqueue(F f); Result<T> enqueue(F f);
~ThreadPool(); ~ThreadPool();
private: private:
friend class Worker; friend class Worker;
...@@ -66,7 +122,7 @@ private: ...@@ -66,7 +122,7 @@ private:
// need to keep track of threads so we can join them // need to keep track of threads so we can join them
std::vector< std::thread > workers; std::vector< std::thread > workers;
// the task queue // the task queue
std::deque< any_packaged_task > tasks; std::deque< std::function<void()> > tasks;
// synchronization // synchronization
std::mutex queue_mutex; std::mutex queue_mutex;
...@@ -76,16 +132,18 @@ private: ...@@ -76,16 +132,18 @@ private:
void Worker::operator()() void Worker::operator()()
{ {
std::function<void()> task;
while(true) while(true)
{ {
std::unique_lock<std::mutex> lock(pool.queue_mutex); {
while(!pool.stop && pool.tasks.empty()) std::unique_lock<std::mutex> lock(pool.queue_mutex);
pool.condition.wait(lock); while(!pool.stop && pool.tasks.empty())
if(pool.stop) pool.condition.wait(lock);
return; if(pool.stop)
any_packaged_task task(pool.tasks.front()); return;
pool.tasks.pop_front(); task = pool.tasks.front();
lock.unlock(); pool.tasks.pop_front();
}
task(); task();
} }
} }
...@@ -98,15 +156,36 @@ ThreadPool::ThreadPool(size_t threads) ...@@ -98,15 +156,36 @@ ThreadPool::ThreadPool(size_t threads)
workers.push_back(std::thread(Worker(*this))); workers.push_back(std::thread(Worker(*this)));
} }
template<class T, class F>
struct CallAndSet {
void operator()(const Result<T> &res, const F f)
{
res.set(f());
res.signal();
}
};
template<class F>
struct CallAndSet<void,F> {
void operator()(const Result<void> &res, const F &f)
{
f();
res.signal();
}
};
// add new work item to the pool // add new work item to the pool
template<class T, class F> template<class T, class F>
std::future<T> ThreadPool::enqueue(F f) Result<T> ThreadPool::enqueue(F f)
{ {
std::packaged_task<T()> task(f); Result<T> res;
std::future<T> res= task.get_future();
{ {
std::unique_lock<std::mutex> lock(queue_mutex); std::unique_lock<std::mutex> lock(queue_mutex);
tasks.push_back(any_packaged_task(std::move(task))); tasks.push_back(std::function<void()>(
[f,res]()
{
CallAndSet<T,F>()(res, f);
}));
} }
condition.notify_one(); condition.notify_one();
return res; return res;
......
#include <iostream>
#include <vector>
#include <chrono>
#include "ThreadPool.h"
int main()
{
ThreadPool pool(4);
std::vector< Result<int> > results;
for(int i = 0; i < 8; ++i) {
results.push_back(
pool.enqueue<int>([i] {
std::cout << "hello " << i << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "world " << i << std::endl;
return i*i;
})
);
}
for(size_t i = 0;i<results.size();++i)
std::cout << results[i].get() << ' ';
std::cout << std::endl;
return 0;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册