提交 05c113f2 编写于 作者: cosmicing's avatar cosmicing

更新00-ProgramElement/2-ThreadPool/examples/Jamfile,...

更新00-ProgramElement/2-ThreadPool/examples/Jamfile, 00-ProgramElement/2-ThreadPool/examples/threadpool_sample.cpp, 00-ProgramElement/2-ThreadPool/include/thread_pool.h, 00-ProgramElement/2-ThreadPool/test/Jamfile, 00-ProgramElement/2-ThreadPool/test/threadpool_unit.cpp, 00-ProgramElement/2-ThreadPool/Jamroot, 00-ProgramElement/2-ThreadPool/README.md
上级 70826daf
project
: usage-requirements <include>include
;
build-project test ;
build-project examples ;
#Threadpool
##调用示例
```c++
#include "thread_pool.h"
#include <iostream>
void task1() {
// do something
}
void task2(int a, int b, int& result) {
result = a + b;
}
int main()
{
Add::Threadpool pool;
pool.Init(10);
pool.Start();
pool.PostTask(task1);
int addResult = 0;
pool.PostTask(std::bind(task2, 3, 5, std::ref(addResult)));
pool.WaitTasks();
std::cout << "add result: " << addResult << std::endl;
pool.Stop();
return 0;
}
```
##接口说明
* 构造线程池对象
```c++
Threadpool();
```
* 销毁线程池对象
```c++
~Threadpool();
```
* 初始化线程池
* 参数:
* number: int, 线程池容纳的线程数目
* 返回值:
* true: 成功初始化
* false: 初始化失败。可能已经初始化过了。
```c++
bool Init(size_t number);
```
* 启动线程池里各个线程
* 启动后,各个线程会阻塞,等待任务到来。处理完任务,又会阻塞,等待其他任务
* 参数:
*
* 返回值:
* true: 成功启动
* false: 启动失败。可能没有初始化,或者已经启动过了
```c++
bool Start();
```
* 停止线程池里各个线程
* 此接口会阻塞,直到各个线程合并。如果此后需要用线程池,需要重新Start
* 参数:
*
* 返回值:
* true: 成功停止
* false: 停止失败。可能没有初始化,或者已经停止过了
```c++
bool Stop();
```
* 向线程池递交任务
* 仅仅递交任务马上返回,不会等待任务完成
* 参数:
*
* 返回值:
* true: 成功递交任务
* false: 递交任务失败,可能线程在某处已经调用Stop了。
bool PostTask(std::function<void()> task);
* 等待各个任务处理结束
* 某些使用场景,在递交任务后,可能需要各个任务完成
* 参数:
*
* 返回值:
*
```c++
void WaitTasks();
```
* 取消已经递交的任务
* 任务递交后,如果尚未执行,可以用此接口取消。如果已经执行,则不会。
* 参数:
*
* 返回值:
*
```c++
void CancelTasks();
```
* 获取线程池线程数目
* 参数:
*
* 返回值:
* size_t的数值,代表线程池初始化时,传过来的数目,即其中的线程数
```c++
size_t GetThreadsNumber() const;
```
* 获取线程池任务数目
* 参数:
*
* 返回值:
* size_t的数值,代表递交到线程池,尚未来得及处理完的数目
```c++
size_t GetTasksNumber() const;
```
exe threadpool_sample : threadpool_sample.cpp : <include>../include <linkflags>-pthread ;
#include "thread_pool.h"
#include <chrono>
#include <iostream>
#include <string>
#include <boost/format.hpp>
std::mutex g_printMtx;
void test1() {
for (int i = 0; i < 500; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
auto temp = boost::format("test1\n");
std::lock_guard<std::mutex> guard(g_printMtx);
std::cout << temp;
}
}
void test2(int a, int b) {
for (int i = 0; i < 500; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
auto c = a + b;
auto temp = boost::format("test2: %1% + %2% = %3%\n") % a % b % c;
std::lock_guard<std::mutex> guard(g_printMtx);
std::cout << temp;
}
}
void test3() {
for (int i = 0; i < 500; i++) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
auto temp = boost::format("test3\n");
std::lock_guard<std::mutex> guard(g_printMtx);
std::cout << temp;
}
}
int main()
{
Add::Threadpool pool;
pool.Init(10);
pool.Start();
pool.PostTask(test1);
pool.PostTask(std::bind(test2, 3, 5));
for (int i = 10; i < 40; i++) {
pool.PostTask(std::bind(test2, i, i * 2));
}
pool.PostTask(test3);
//std::this_thread::sleep_for(std::chrono::nanoseconds(1));
pool.WaitTasks();
//pool.CancelTasks();
//pool.Stop();
return 0;
}
#include <atomic>
#include <mutex>
#include <functional>
#include <memory>
#include <vector>
#include <queue>
#include <thread>
#include <condition_variable>
namespace Add {
class Threadpool
{
class State {
public:
State(Threadpool* pool): pool_(pool) {}
virtual bool Init() { return false; }
virtual bool Start() { return false; }
virtual bool Stop() { return false; }
void Done() { bool expected = true; pool_->stateProcessing_.compare_exchange_strong(expected, false); }
protected:
Threadpool* pool_ { nullptr };
};
template<typename T> static State* GetStateInstance(Threadpool* pool) {
static T state(pool);
return &state;
}
class DefaultState final : public State {
public:
using State::State;
bool Init() override;
};
class InitedState final : public State {
public:
using State::State;
bool Start() override;
};
class StartedState final : public State {
public:
using State::State;
bool Stop() override;
};
friend State;
friend DefaultState;
friend InitedState;
friend StartedState;
public:
/*
* 构造线程池对象
*/
Threadpool() { state_ = GetStateInstance<DefaultState>(this); }
/*
* 销毁线程池对象
*/
~Threadpool() {
if (exit_) {
return;
}
Stop(); // not care the return value
}
/*
* 初始化线程池
* 参数:
* number: int, 线程池容纳的线程数目
* 返回值:
* true: 成功初始化
* false: 初始化失败。可能已经初始化过了。
*/
bool Init(size_t number) {
if (!state_.load()->Init()) {
return false;
}
threadsCnt_ = number;
threads_.resize(number);
state_.load()->Done();
return true;
}
/*
* 启动线程池里各个线程
* 启动后,各个线程会阻塞,等待任务到来。处理完任务,又会阻塞,等待其他任务
* 参数:
* 无
* 返回值:
* true: 成功启动
* false: 启动失败。可能没有初始化,或者已经启动过了
*/
bool Start() {
if (!state_.load()->Start()) {
return false;
}
std::unique_lock<std::mutex> lock(taskMutex_);
exit_ = false;
lock.unlock();
auto num = threadsCnt_.load();
for (size_t i = 0; i < num; i++) {
threads_[i] = std::thread(&Threadpool::Run, this);
}
state_.load()->Done();
return true;
}
/*
* 向线程池递交任务
* 仅仅递交任务马上返回,不会等待任务完成
* 参数:
* 无
* 返回值:
* true: 成功递交任务
* false: 递交任务失败,可能线程在某处已经调用Stop了。
*/
bool PostTask(std::function<void()> task) {
bool result = false;
{
std::unique_lock<std::mutex> lock(taskMutex_);
if (!exit_) {
tasks_.push(task);
taskCnt_++;
result = true;
}
}
if (result) {
taskCond_.notify_one();
}
return result;
}
/*
* 等待各个任务处理结束
* 某些使用场景,在递交任务后,可能需要各个任务完成
* 参数:
* 无
* 返回值:
* 无
*/
void WaitTasks() {
std::unique_lock<std::mutex> lock(taskMutex_);
finishCond_.wait(lock, [this] { return exit_ || taskCnt_ == 0; });
}
/*
* 取消已经递交的任务
* 任务递交后,如果尚未执行,可以用此接口取消。如果已经执行,则不会。
* 参数:
* 无
* 返回值:
* 无
*/
void CancelTasks() {
std::unique_lock<std::mutex> lock(taskMutex_);
size_t leaveCnt = tasks_.size();
taskCnt_ -= leaveCnt;
// clear tasks
std::queue<std::function<void()>> temp;
tasks_.swap(temp);
}
/*
* 停止线程池里各个线程
* 此接口会阻塞,直到各个线程合并。如果此后需要用线程池,需要重新Start
* 参数:
* 无
* 返回值:
* true: 成功停止
* false: 停止失败。可能没有初始化,或者已经停止过了
*/
bool Stop() {
if (!state_.load()->Stop()) {
return false;
}
std::unique_lock<std::mutex> lock(taskMutex_);
exit_ = true;
lock.unlock();
taskCond_.notify_all();
for (auto it = threads_.begin(); it != threads_.end(); ++it) {
it->join();
}
state_.load()->Done();
return true;
}
/*
* 获取线程池线程数目
* 参数:
* 无
* 返回值:
* size_t的数值,代表线程池初始化时,传过来的数目,即其中的线程数
*/
size_t GetThreadsNumber() const {
return threadsCnt_.load();
}
/*
* 获取线程池任务数目
* 参数:
* 无
* 返回值:
* size_t的数值,代表递交到线程池,尚未来得及处理完的数目
*/
size_t GetTasksNumber() const {
return taskCnt_.load();
}
private:
Threadpool(const Threadpool&) = delete;
Threadpool& operator=(const Threadpool&) = delete;
void Run() {
while (!exit_) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(taskMutex_);
taskCond_.wait(lock, [this] { return exit_ || !this->tasks_.empty(); });
if (exit_) {
break;
}
task = tasks_.front();
tasks_.pop();
}
task();
std::unique_lock<std::mutex> lock(taskMutex_);
taskCnt_--;
lock.unlock();
if (taskCnt_ == 0) {
finishCond_.notify_all();
}
}
}
private:
std::atomic<State*> state_ { nullptr };
std::vector<std::thread> threads_;
std::queue<std::function<void()>> tasks_;
std::mutex taskMutex_;
std::condition_variable taskCond_;
std::condition_variable finishCond_;
std::atomic<size_t> threadsCnt_ { 0 };
std::atomic<size_t> taskCnt_ { 0 };
std::atomic_bool exit_ { true };
std::atomic_bool stateProcessing_ { false };
};
inline bool Threadpool::DefaultState::Init() {
bool expected = false;
if (!pool_->stateProcessing_.compare_exchange_strong(expected, true)) {
return false;
}
pool_->state_ = Threadpool::GetStateInstance<InitedState>(pool_);
return true;
}
inline bool Threadpool::InitedState::Start() {
bool expected = false;
if (!pool_->stateProcessing_.compare_exchange_strong(expected, true)) {
return false;
}
pool_->state_ = Threadpool::GetStateInstance<StartedState>(pool_);
return true;
}
inline bool Threadpool::StartedState::Stop() {
bool expected = false;
if (!pool_->stateProcessing_.compare_exchange_strong(expected, true)) {
return false;
}
pool_->state_ = Threadpool::GetStateInstance<InitedState>(pool_);
return true;
}
} // namespace Add
lib boost_unit_test_framework ;
exe threadpool_unit : threadpool_unit.cpp boost_unit_test_framework : <include>../include <linkflags>-pthread ;
#define BOOST_TEST_DYN_LINK
#define BOOST_TEST_MODULE ThrdPool
#include "thread_pool.h"
#include <boost/test/unit_test.hpp>
#include <memory>
#include <chrono>
#include <iostream>
using namespace Add;
BOOST_AUTO_TEST_SUITE(threadpool_test)
BOOST_AUTO_TEST_CASE(threadpool_test_init_and_get_threads_num)
{
auto pool = std::make_unique<Threadpool>();
BOOST_REQUIRE(pool->GetThreadsNumber() == 0);
pool->Init(100);
BOOST_REQUIRE(pool->GetThreadsNumber() == 100);
}
BOOST_AUTO_TEST_CASE(threadpool_test_control_sequence_0)
{
auto pool = std::make_unique<Threadpool>();
BOOST_REQUIRE(pool->Start() == false);
}
BOOST_AUTO_TEST_CASE(threadpool_test_control_sequence_1)
{
auto pool = std::make_unique<Threadpool>();
BOOST_REQUIRE(pool->Stop() == false);
}
BOOST_AUTO_TEST_CASE(threadpool_test_control_sequence_2)
{
auto pool = std::make_unique<Threadpool>();
pool->Init(1);
BOOST_REQUIRE(pool->Start() == true);
BOOST_REQUIRE(pool->Stop() == true);
}
BOOST_AUTO_TEST_CASE(threadpool_test_control_sequence_3)
{
auto pool = std::make_unique<Threadpool>();
pool->Init(1);
BOOST_REQUIRE(pool->Stop() == false);
}
BOOST_AUTO_TEST_CASE(threadpool_test_control_sequence_4)
{
auto pool = std::make_unique<Threadpool>();
pool->Init(1);
BOOST_REQUIRE(pool->Start() == true);
BOOST_REQUIRE(pool->Stop() == true);
BOOST_REQUIRE(pool->Start() == true);
}
BOOST_AUTO_TEST_CASE(threadpool_test_control_sequence_5)
{
auto pool = std::make_unique<Threadpool>();
pool->Init(1);
auto temp_func = [] () {};
BOOST_REQUIRE(pool->PostTask(temp_func) == false);
BOOST_REQUIRE(pool->Start() == true);
BOOST_REQUIRE(pool->PostTask(temp_func) == true);
BOOST_REQUIRE(pool->Stop() == true);
BOOST_REQUIRE(pool->PostTask(temp_func) == false);
}
BOOST_AUTO_TEST_CASE(threadpool_test_one_thread_multi_tasks)
{
auto pool = std::make_unique<Threadpool>();
pool->Init(1);
pool->Start();
std::atomic_int total { 0 };;
auto temp_fun = [&total] () {
total++;
};
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
pool->WaitTasks();
BOOST_REQUIRE(total.load() == 3);
}
BOOST_AUTO_TEST_CASE(threadpool_test_multi_threads_one_task)
{
auto pool = std::make_unique<Threadpool>();
pool->Init(3);
pool->Start();
std::atomic_int total { 0 };;
auto temp_fun = [&total] () {
total++;
};
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
pool->WaitTasks();
BOOST_REQUIRE(total.load() == 1);
}
BOOST_AUTO_TEST_CASE(threadpool_test_multi_threads_multi_tasks_1)
{
auto pool = std::make_unique<Threadpool>();
pool->Init(5);
pool->Start();
std::atomic_int total { 0 };;
auto temp_fun = [&total] () {
total++;
};
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
pool->WaitTasks();
BOOST_REQUIRE(total.load() == 3);
}
BOOST_AUTO_TEST_CASE(threadpool_test_multi_threads_multi_tasks_2)
{
auto pool = std::make_unique<Threadpool>();
pool->Init(3);
pool->Start();
std::atomic_int total { 0 };;
auto temp_fun = [&total] () {
total++;
};
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
pool->WaitTasks();
BOOST_REQUIRE(total.load() == 7);
}
BOOST_AUTO_TEST_CASE(threadpool_test_post_task_when_exiting)
{
auto pool = std::make_unique<Threadpool>();
pool->Init(3);
pool->Start();
std::atomic_int total { 0 };;
auto temp_fun = [&total] () {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
total++;
};
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
pool->Stop();
BOOST_REQUIRE(pool->PostTask(temp_fun) == false);
BOOST_REQUIRE(pool->PostTask(temp_fun) == false);
BOOST_REQUIRE(pool->PostTask(temp_fun) == false);
BOOST_REQUIRE(pool->PostTask(temp_fun) == false);
pool->WaitTasks();
BOOST_REQUIRE(total.load() <= 2);
}
BOOST_AUTO_TEST_CASE(threadpool_test_wait_tasks)
{
auto pool = std::make_unique<Threadpool>();
pool->Init(3);
pool->Start();
std::atomic_int total { 0 };;
auto temp_fun = [&total] () {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
total++;
};
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
pool->WaitTasks();
BOOST_REQUIRE(total.load() == 6);
}
BOOST_AUTO_TEST_CASE(threadpool_test_cancel_tasks)
{
auto pool = std::make_unique<Threadpool>();
pool->Init(1);
pool->Start();
std::atomic_int total { 0 };;
auto temp_fun = [&total] () {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
total++;
};
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
pool->CancelTasks();
BOOST_CHECK(total < 6);
}
BOOST_AUTO_TEST_CASE(threadpool_test_get_tasks_number)
{
auto pool = std::make_unique<Threadpool>();
pool->Init(2);
pool->Start();
std::atomic_int total { 0 };;
auto temp_fun = [&total] () {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
total++;
};
BOOST_REQUIRE(pool->GetTasksNumber() == 0);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
pool->WaitTasks();
BOOST_REQUIRE(pool->GetTasksNumber() == 0);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
pool->WaitTasks();
BOOST_REQUIRE(pool->GetTasksNumber() == 0);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
BOOST_REQUIRE(pool->PostTask(temp_fun) == true);
pool->CancelTasks();
BOOST_CHECK(pool->GetTasksNumber() < 9u);
}
BOOST_AUTO_TEST_CASE(threadpool_multi_init)
{
auto pool = std::make_unique<Threadpool>();
auto thread1 = std::thread([&pool] () {
for (int i = 0; i < 5; i++) {
std::this_thread::sleep_for(std::chrono::nanoseconds(1));
pool->Init(3);
}
});
auto thread2 = std::thread([&pool] () {
for (int i = 0; i < 5; i++) {
std::this_thread::sleep_for(std::chrono::nanoseconds(1));
pool->Init(5);
}
});
thread1.join();
thread2.join();
auto threadcnt = pool->GetThreadsNumber();
auto right = (threadcnt == 3u || threadcnt == 5u);
BOOST_REQUIRE(right == true);
}
BOOST_AUTO_TEST_CASE(threadpool_multi_start_stop)
{
auto pool = std::make_unique<Threadpool>();
pool->Init(5);
auto thread1 = std::thread([&pool] () {
for (int i = 0; i < 5; i++) {
std::this_thread::sleep_for(std::chrono::nanoseconds(1));
pool->Start();
}
});
auto thread2 = std::thread([&pool] () {
for (int i = 0; i < 5; i++) {
std::this_thread::sleep_for(std::chrono::nanoseconds(1));
pool->Stop();
}
});
thread1.join();
thread2.join();
auto right = (pool->Start() == true || pool->Stop() == true);
BOOST_REQUIRE(right == true);
}
BOOST_AUTO_TEST_CASE(threadpool_multi_init_start_stop)
{
auto pool = std::make_unique<Threadpool>();
auto thread0 = std::thread([&pool] () {
for (int i = 0; i < 5; i++) {
std::this_thread::sleep_for(std::chrono::nanoseconds(1));
pool->Init(3);
}
});
auto thread1 = std::thread([&pool] () {
for (int i = 0; i < 5; i++) {
std::this_thread::sleep_for(std::chrono::nanoseconds(1));
pool->Start();
}
});
auto thread2 = std::thread([&pool] () {
for (int i = 0; i < 5; i++) {
std::this_thread::sleep_for(std::chrono::nanoseconds(1));
pool->Stop();
}
});
thread0.join();
thread1.join();
thread2.join();
auto threadCnt = pool->GetThreadsNumber();
auto right = threadCnt == 3u ? (pool->Start() == true || pool->Stop() == true) : (pool->Start() == false && pool->Stop() == false);
BOOST_REQUIRE(right == true);
}
BOOST_AUTO_TEST_SUITE_END()
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册