提交 af18d45d 编写于 作者: 独孤过's avatar 独孤过

update from v2.1.0 to v2.2.0

上级 6da9ddd1
......@@ -40,4 +40,4 @@ Licensed under the Academic Free License version 3.0
15) Right to Use. You may use the Original Work in all ways not otherwise restricted or conditioned by this License or by law, and Licensor promises not to interfere with or be responsible for such uses by You.
16) Modification of This License. This License is Copyright © 2005 Lawrence Rosen. Permission is granted to copy, distribute, or communicate this License without modification. Nothing in this License permits You to modify this License as applied to the Original Work or to Derivative Works. However, You may modify the text of this License and copy, distribute or communicate your modified version (the "Modified License") and apply it to other original works of authorship subject to the following conditions: (i) You may not indicate in any way that your Modified License is the "Academic Free License" or "AFL" and you may not use those names in the name of your Modified License; (ii) You must replace the notice specified in the first paragraph above with the notice "Licensed under <insert your license name here>" or with a notice of your own that is not confusingly similar to the notice in this License; and (iii) You may not claim that your original works are open source software unless your Modified License has been approved by Open Source Initiative (OSI) and You comply with its license review and certification process.
16) Modification of This License. This License is Copyright © 2017 CDU Innovation Studio. Permission is granted to copy, distribute, or communicate this License without modification. Nothing in this License permits You to modify this License as applied to the Original Work or to Derivative Works. However, You may modify the text of this License and copy, distribute or communicate your modified version (the "Modified License") and apply it to other original works of authorship subject to the following conditions: (i) You may not indicate in any way that your Modified License is the "Academic Free License" or "AFL" and you may not use those names in the name of your Modified License; (ii) You must replace the notice specified in the first paragraph above with the notice "Licensed under Academic Free License" or with a notice of your own that is not confusingly similar to the notice in this License; and (iii) You may not claim that your original works are open source software unless your Modified License has been approved by Open Source Initiative (OSI) and You comply with its license review and certification process.
......@@ -7,7 +7,7 @@ Copyright © 2017 CDU Innovation Studio. Some Rights Reserved.
## Source Code
[Core.hpp](./src/Core.hpp)
[Queue.hpp](./src/Queue.hpp)
[DoubleQueue.hpp](./src/DoubleQueue.hpp)
[Condition.hpp](./src/Condition.hpp)
[Thread.h](./src/Thread.h)
[Thread.cpp](./src/Thread.cpp)
......
......@@ -34,6 +34,7 @@
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <atomic>
#include <mutex>
#include <condition_variable>
......@@ -46,7 +47,12 @@ template <typename _Size = std::size_t>
class Condition
{
public:
enum class Strategy { STRICT, RELAXED };
enum class Strategy : std::uint8_t
{
STRICT, RELAXED
};
public:
using Size = _Size;
private:
......@@ -129,8 +135,7 @@ template <typename _Size>
void Condition<_Size>::notify_one(Strategy _strategy)
{
std::unique_lock lock(_mutex);
if (_strategy == Strategy::RELAXED)
lock.unlock();
if (_strategy == Strategy::RELAXED) lock.unlock();
_condition.notify_one();
}
......@@ -139,8 +144,7 @@ template <typename _Size>
void Condition<_Size>::notify_all(Strategy _strategy)
{
std::unique_lock lock(_mutex);
if (_strategy == Strategy::RELAXED)
lock.unlock();
if (_strategy == Strategy::RELAXED) lock.unlock();
_condition.notify_all();
}
......
......@@ -10,9 +10,11 @@
// 拼接
#define SPLICE(front, back) front##back
// 弃用
#define DEPRECATED \
[[deprecated("The name for this item is deprecated.")]]
// 替换
#define REPLACEMENT(signature) \
[[deprecated("The name for this item is deprecated. " \
"Instead, use the name: " STRING(signature) ".")]]
......@@ -25,13 +27,13 @@
ETERFREE_SPACE_BEGIN
//template <typename _Type, const decltype(sizeof(0)) _SIZE>
//constexpr auto size(_Type(&_array)[_SIZE])
//constexpr auto size(_Type(&_array)[_SIZE]) noexcept
//{
// return sizeof _array / sizeof _array[0];
//}
template <typename _Type, const decltype(sizeof(0)) _SIZE>
constexpr auto size(_Type(&_array)[_SIZE])
constexpr auto size(_Type(&_array)[_SIZE]) noexcept
{
return _SIZE;
}
......
/*
* 文件名称:DoubleQueue.hpp
* 语言标准:C++17
*
* 创建日期:2019年03月08日
* 更新日期:2023年01月07日
*
* 摘要
* 1.定义双缓冲队列类模板DoubleQueue。
* 2.包含入口队列和出口队列。在放入元素之时,只锁定入口互斥元;在取出元素之时,先锁定出口互斥元,若出口队列为空,再锁定入口互斥元,并且交换两个队列。
* 以此降低两个队列的相互影响,从而提高出入队列的效率。
* 3.支持自定义队列容量和动态调整容量,支持批量出入队列和清空队列。
*
* 作者:许聪
* 邮箱:solifree@qq.com
*
* 版本:v2.0.0
* 变化
* v2.0.0
* 1.更名Queue为DoubleQueue。
* 2.定制复制语义和移动语义。
*/
#pragma once
#include <utility>
#include <optional>
#include <list>
#include <atomic>
#include <mutex>
#include "Core.hpp"
ETERFREE_SPACE_BEGIN
template <typename _ElementType>
class DoubleQueue
{
public:
using ElementType = _ElementType;
using QueueType = std::list<ElementType>;
using SizeType = typename QueueType::size_type;
using MutexType = std::mutex;
private:
using AtomicType = std::atomic<SizeType>;
private:
AtomicType _capacity;
AtomicType _size;
mutable MutexType _entryMutex;
QueueType _entryQueue;
mutable MutexType _exitMutex;
QueueType _exitQueue;
private:
static auto get(const AtomicType& _atomic) noexcept
{
return _atomic.load(std::memory_order_relaxed);
}
static void set(AtomicType& _atomic, SizeType _size) noexcept
{
_atomic.store(_size, std::memory_order_relaxed);
}
static auto exchange(AtomicType& _atomic, SizeType _size) noexcept
{
return _atomic.exchange(_size, std::memory_order_relaxed);
}
static void copy(DoubleQueue& _left, const DoubleQueue& _right);
static void move(DoubleQueue& _left, DoubleQueue&& _right) noexcept;
private:
auto add(SizeType _size) noexcept
{
return this->_size.fetch_add(_size, \
std::memory_order_relaxed);
}
auto subtract(SizeType _size) noexcept
{
return this->_size.fetch_sub(_size, \
std::memory_order_relaxed);
}
public:
// 若_capacity小于等于零,则无限制,否则其为上限值
DoubleQueue(SizeType _capacity = 0) : \
_capacity(_capacity), _size(0) {}
DoubleQueue(const DoubleQueue& _another);
DoubleQueue(DoubleQueue&& _another);
DoubleQueue& operator=(const DoubleQueue& _another);
DoubleQueue& operator=(DoubleQueue&& _another);
auto capacity() const noexcept
{
return get(_capacity);
}
void reserve(SizeType _capacity) noexcept
{
set(this->_capacity, _capacity);
}
auto size() const noexcept { return get(_size); }
bool empty() const noexcept { return size() == 0; }
DEPRECATED
auto& mutex() noexcept { return _exitMutex; }
std::optional<SizeType> push(const ElementType& _element);
std::optional<SizeType> push(ElementType&& _element);
std::optional<SizeType> push(QueueType& _queue);
std::optional<SizeType> push(QueueType&& _queue);
bool pop(ElementType& _element);
std::optional<ElementType> pop();
bool pop(QueueType& _queue);
SizeType clear();
};
template <typename _ElementType>
void DoubleQueue<_ElementType>::copy(DoubleQueue& _left, \
const DoubleQueue& _right)
{
_left._exitQueue = _right._exitQueue;
_left._entryQueue = _right._entryQueue;
set(_left._size, get(_right._size));
set(_left._capacity, get(_right._capacity));
}
template <typename _ElementType>
void DoubleQueue<_ElementType>::move(DoubleQueue& _left, \
DoubleQueue&& _right) noexcept
{
_left._exitQueue = std::move(_right._exitQueue);
_left._entryQueue = std::move(_right._entryQueue);
set(_left._size, exchange(_right._size, 0));
set(_left._capacity, exchange(_right._capacity, 0));
}
template <typename _ElementType>
DoubleQueue<_ElementType>::DoubleQueue(const DoubleQueue& _another)
{
std::scoped_lock lock(_another._exitMutex, _another._entryMutex);
copy(*this, _another);
}
template <typename _ElementType>
DoubleQueue<_ElementType>::DoubleQueue(DoubleQueue&& _another)
{
std::scoped_lock lock(_another._exitMutex, _another._entryMutex);
move(*this, std::forward<DoubleQueue>(_another));
}
template <typename _ElementType>
auto DoubleQueue<_ElementType>::operator=(const DoubleQueue& _another) \
-> DoubleQueue&
{
if (&_another != this)
{
std::scoped_lock lock(this->_exitMutex, this->_entryMutex, \
_another._exitMutex, _another._entryMutex);
copy(*this, _another);
}
return *this;
}
template <typename _ElementType>
auto DoubleQueue<_ElementType>::operator=(DoubleQueue&& _another) \
-> DoubleQueue&
{
if (&_another != this)
{
std::scoped_lock lock(this->_exitMutex, this->_entryMutex, \
_another._exitMutex, _another._entryMutex);
move(*this, std::forward<DoubleQueue>(_another));
}
return *this;
}
template <typename _ElementType>
auto DoubleQueue<_ElementType>::push(const ElementType& _element) \
-> std::optional<SizeType>
{
std::lock_guard lock(_entryMutex);
if (auto capacity = this->capacity(); \
capacity > 0 && size() >= capacity)
return std::nullopt;
_entryQueue.push_back(_element);
return add(1);
}
template <typename _ElementType>
auto DoubleQueue<_ElementType>::push(ElementType&& _element) \
-> std::optional<SizeType>
{
std::lock_guard lock(_entryMutex);
if (auto capacity = this->capacity(); \
capacity > 0 && size() >= capacity)
return std::nullopt;
_entryQueue.push_back(std::forward<ElementType>(_element));
return add(1);
}
template <typename _ElementType>
auto DoubleQueue<_ElementType>::push(QueueType& _queue) \
-> std::optional<SizeType>
{
std::lock_guard lock(_entryMutex);
if (auto capacity = this->capacity(), \
size = this->size(); \
capacity > 0 && (size >= capacity \
|| _queue.size() >= capacity - size))
return std::nullopt;
auto size = _queue.size();
_entryQueue.splice(_entryQueue.cend(), _queue);
return add(size);
}
template <typename _ElementType>
auto DoubleQueue<_ElementType>::push(QueueType&& _queue) \
-> std::optional<SizeType>
{
std::lock_guard lock(_entryMutex);
if (auto capacity = this->capacity(), \
size = this->size(); \
capacity > 0 && (size >= capacity \
|| _queue.size() >= capacity - size))
return std::nullopt;
auto size = _queue.size();
_entryQueue.splice(_entryQueue.cend(), \
std::forward<QueueType>(_queue));
return add(size);
}
// 支持元素的完全移动语义
template <typename _ElementType>
bool DoubleQueue<_ElementType>::pop(ElementType& _element)
{
std::lock_guard lock(_exitMutex);
if (empty()) return false;
if (_exitQueue.empty())
{
std::lock_guard lock(_entryMutex);
_exitQueue.swap(_entryQueue);
}
subtract(1);
_element = std::move(_exitQueue.front());
_exitQueue.pop_front();
return true;
}
// 编译器RVO机制决定完全移动语义或者移动语义与复制语义
template <typename _ElementType>
auto DoubleQueue<_ElementType>::pop() -> std::optional<ElementType>
{
std::lock_guard lock(_exitMutex);
if (empty()) return std::nullopt;
if (_exitQueue.empty())
{
std::lock_guard lock(_entryMutex);
_exitQueue.swap(_entryQueue);
}
subtract(1);
std::optional result = std::move(_exitQueue.front());
_exitQueue.pop_front();
return result;
}
template <typename _ElementType>
bool DoubleQueue<_ElementType>::pop(QueueType& _queue)
{
std::lock_guard exitLock(_exitMutex);
if (empty()) return false;
_queue.splice(_queue.cend(), _exitQueue);
std::lock_guard entryLock(_entryMutex);
_queue.splice(_queue.cend(), _entryQueue);
set(_size, 0);
return true;
}
template <typename _ElementType>
auto DoubleQueue<_ElementType>::clear() -> SizeType
{
std::scoped_lock lock(_exitMutex, _entryMutex);
_exitQueue.clear();
_entryQueue.clear();
return exchange(_size, 0);
}
ETERFREE_SPACE_END
/*
* 文件名称:Queue.hpp
* 语言标准:C++17
*
* 创建日期:2019年03月08日
* 更新日期:2022年03月20日
*
* 摘要
* 1.定义双缓冲队列类模板Queue。
* 2.支持自定义队列容量,包含入口队列和出口队列,并以交换策略降低二者的相互影响。
* 3.在放入元素之时,只锁定入口互斥元。在取出元素之时,先锁定出口互斥元,若出口队列为空,再锁定入口互斥元,并且交换两个队列。
* 以此降低两个队列的相互影响,从而提高出入队列的效率。
*
* 作者:许聪
* 邮箱:solifree@qq.com
*
* 版本:v1.5.2
* 变化
* v1.5.1
* 1.入队列可选复制语义或者移动语义。
* 2.支持批量出队列。
* 3.新增清空队列方法。
* 4.删除非线程安全函数front和pop,引入线程安全函数pop。
* v1.5.2
* 1.支持动态调整容量。
*/
#pragma once
#include <utility>
#include <optional>
#include <list>
#include <atomic>
#include <mutex>
#include "Core.hpp"
#include "DoubleQueue.hpp"
ETERFREE_SPACE_BEGIN
template <typename _ElementType>
class Queue
{
public:
using ElementType = _ElementType;
using QueueType = std::list<ElementType>;
using SizeType = typename QueueType::size_type;
using MutexType = std::mutex;
private:
using AtomicType = std::atomic<SizeType>;
private:
AtomicType _capacity;
AtomicType _size;
MutexType _entryMutex;
QueueType _entryQueue;
MutexType _exitMutex;
QueueType _exitQueue;
private:
static void set(AtomicType& _atomic, SizeType _size) noexcept
{
_atomic.store(_size, std::memory_order_relaxed);
}
static auto get(const AtomicType& _atomic) noexcept
{
return _atomic.load(std::memory_order_relaxed);
}
private:
auto add(SizeType _size) noexcept
{
return this->_size.fetch_add(_size, \
std::memory_order_relaxed);
}
auto subtract(SizeType _size) noexcept
{
return this->_size.fetch_sub(_size, \
std::memory_order_relaxed);
}
public:
// 若_capacity小于等于零,则无限制,否则其为上限值
Queue(SizeType _capacity = 0)
: _capacity(_capacity), _size(0) {}
auto capacity() const noexcept
{
return get(_capacity);
}
void reserve(SizeType _capacity) noexcept
{
set(this->_capacity, _capacity);
}
auto size() const noexcept { return get(_size); }
bool empty() const noexcept { return size() == 0; }
DEPRECATED
auto& mutex() noexcept { return _exitMutex; }
std::optional<SizeType> push(const ElementType& _element);
std::optional<SizeType> push(ElementType&& _element);
std::optional<SizeType> push(QueueType& _queue);
std::optional<SizeType> push(QueueType&& _queue);
bool pop(ElementType& _element);
std::optional<ElementType> pop();
bool pop(QueueType& _queue);
void clear();
};
template <typename _ElementType>
auto Queue<_ElementType>::push(const ElementType& _element) \
-> std::optional<SizeType>
{
std::lock_guard lock(_entryMutex);
if (auto capacity = this->capacity(); \
capacity > 0 && size() >= capacity)
return std::nullopt;
_entryQueue.push_back(_element);
return add(1);
}
template <typename _ElementType>
auto Queue<_ElementType>::push(ElementType&& _element) \
-> std::optional<SizeType>
{
std::lock_guard lock(_entryMutex);
if (auto capacity = this->capacity(); \
capacity > 0 && size() >= capacity)
return std::nullopt;
_entryQueue.push_back(std::forward<ElementType>(_element));
return add(1);
}
template <typename _ElementType>
auto Queue<_ElementType>::push(QueueType& _queue) \
-> std::optional<SizeType>
{
std::lock_guard lock(_entryMutex);
if (auto capacity = this->capacity(), \
size = this->size(); \
capacity > 0 && (size >= capacity \
|| _queue.size() >= capacity - size))
return std::nullopt;
auto size = _queue.size();
_entryQueue.splice(_entryQueue.cend(), _queue);
return add(size);
}
template <typename _ElementType>
auto Queue<_ElementType>::push(QueueType&& _queue) \
-> std::optional<SizeType>
{
std::lock_guard lock(_entryMutex);
if (auto capacity = this->capacity(), \
size = this->size(); \
capacity > 0 && (size >= capacity \
|| _queue.size() >= capacity - size))
return std::nullopt;
auto size = _queue.size();
_entryQueue.splice(_entryQueue.cend(), \
std::forward<QueueType>(_queue));
return add(size);
}
template <typename _ElementType>
bool Queue<_ElementType>::pop(ElementType& _element)
{
std::lock_guard lock(_exitMutex);
if (empty()) return false;
if (_exitQueue.empty())
{
std::lock_guard lock(_entryMutex);
_exitQueue.swap(_entryQueue);
}
subtract(1);
_element = _exitQueue.front();
_exitQueue.pop_front();
return true;
}
template <typename _ElementType>
auto Queue<_ElementType>::pop() \
-> std::optional<ElementType>
{
std::lock_guard lock(_exitMutex);
if (empty()) return std::nullopt;
if (_exitQueue.empty())
{
std::lock_guard lock(_entryMutex);
_exitQueue.swap(_entryQueue);
}
subtract(1);
std::optional result = _exitQueue.front();
_exitQueue.pop_front();
return result;
}
template <typename _ElementType>
bool Queue<_ElementType>::pop(QueueType& _queue)
{
std::lock_guard exitLock(_exitMutex);
if (empty()) return false;
_queue.splice(_queue.cend(), _exitQueue);
std::lock_guard entryLock(_entryMutex);
_queue.splice(_queue.cend(), _entryQueue);
set(_size, 0);
return true;
}
template <typename _ElementType>
void Queue<_ElementType>::clear()
{
std::scoped_lock lock(_exitMutex, _entryMutex);
_exitQueue.clear();
_entryQueue.clear();
set(_size, 0);
}
using Queue = DoubleQueue<_ElementType>;
ETERFREE_SPACE_END
#include "Thread.h"
#include "Condition.hpp"
#include "Queue.hpp"
#include "DoubleQueue.hpp"
#include <cstdint>
#include <exception>
#include <iostream>
#include <sstream>
#include <atomic>
ETERFREE_SPACE_BEGIN
......@@ -12,7 +14,7 @@ ETERFREE_SPACE_BEGIN
struct Thread::Structure
{
// 状态枚举
enum class State
enum class State : std::uint8_t
{
EMPTY, // 空态
INITIAL, // 初始态
......@@ -23,14 +25,16 @@ struct Thread::Structure
using Condition = Condition<>;
std::thread _thread; // 线程实体
std::atomic<State> _state; // 原子状态
std::mutex _threadMutex; // 线程互斥元
std::thread _thread; // 线程实体
Condition _condition; // 强化条件变量
std::atomic<State> _state; // 原子状态
QueueType _taskQueue; // 任务队列
TaskType _task; // 任务函数子
mutable std::mutex _taskMutex; // 任务互斥元
TaskType _task; // 任务函数子
QueueType _taskQueue; // 任务队列
Callback _callback; // 回调函数子
Structure() : _state(State::EMPTY) {}
......@@ -41,6 +45,12 @@ struct Thread::Structure
return _thread.get_id();
}
// 获取状态
auto getState() const noexcept
{
return _state.load(std::memory_order_relaxed);
}
// 设置状态
void setState(State _state) noexcept
{
......@@ -48,60 +58,65 @@ struct Thread::Structure
std::memory_order_relaxed);
}
// 获取状态
auto getState() const noexcept
// 任务有效性
bool getValidity() const
{
return _state.load(std::memory_order_relaxed);
std::lock_guard lock(_taskMutex);
return static_cast<bool>(_task);
}
// 获取任务
bool getTask(TaskType& _task);
// 设置任务
void setTask(const decltype(_task)& _task)
void setTask(const TaskType& _task)
{
std::lock_guard lock(_taskMutex);
this->_task = _task;
}
void setTask(decltype(_task)&& _task)
void setTask(TaskType&& _task)
{
std::lock_guard lock(_taskMutex);
this->_task = std::move(_task);
}
// 任务有效性
bool getValidity() const
{
std::lock_guard lock(_taskMutex);
return static_cast<bool>(_task);
this->_task = std::forward<TaskType>(_task);
}
};
// 获取任务
bool Thread::setTask(DataType& _data)
bool Thread::Structure::getTask(TaskType& _task)
{
std::lock_guard lock(_taskMutex);
_task = std::move(this->_task);
return static_cast<bool>(_task);
}
// 获取任务
bool Thread::getTask(DataType& _data)
{
// 无任务队列
if (!_data->_taskQueue) return false;
if (!_data->_taskQueue)
return false;
auto result = _data->_taskQueue->pop();
if (result)
{
_data->setState(Structure::State::RUNNABLE);
_data->setTask(result.value());
}
return result.has_value();
decltype(_data->_task) task;
if (!_data->_taskQueue->pop(task) \
|| !task) return false;
_data->setState(Structure::State::RUNNABLE);
_data->setTask(std::move(task));
return true;
}
// 线程主函数
void Thread::execute(DataType _data)
{
// 条件变量的谓词,若任务有效,则无需等待通知
auto predicate = [&_data] \
auto predicate = [&_data]
{ return _data->getValidity(); };
// 若谓词为真,自动解锁互斥元,阻塞线程,直至通知激活,再次锁定互斥元
_data->_condition.wait(predicate);
// 线程退出通道
while (_data->getValidity() \
|| _data->_condition)
while (_data->_condition \
|| _data->getValidity())
{
using State = Structure::State;
_data->setState(State::RUNNING);
......@@ -109,26 +124,27 @@ void Thread::execute(DataType _data)
// 执行函数子之时捕获异常,防止线程泄漏
try
{
// 若任务函数子有效,执行任务
if (_data->_task)
_data->_task();
// 函数子有效则执行任务
if (decltype(_data->_task) task; \
_data->getTask(task)) task();
}
catch (std::exception& exception)
{
std::cerr << exception.what() << std::endl;
std::ostringstream stream;
stream << exception.what() << std::endl;
std::clog << stream.str();
}
// 执行完毕清除任务
_data->_task = nullptr;
// 配置新任务
bool idle = !setTask(_data);
auto callback = _data->_callback;
if (idle) _data->setState(State::BLOCKED);
// 若回调函数子有效,以闲置状态和线程标识为参数,执行回调函数子
// 获取新任务
bool idle = !getTask(_data);
if (idle)
_data->setState(State::BLOCKED);
// 若回调函数子有效,以线程标识和闲置状态为参数,执行回调函数子
if (callback)
callback(idle, _data->getID());
callback(_data->getID(), idle);
// 根据谓词真假,决定是否阻塞线程
_data->_condition.wait(predicate);
......@@ -136,25 +152,26 @@ void Thread::execute(DataType _data)
}
// 默认构造函数
Thread::Thread()
: _data(std::make_shared<Structure>())
Thread::Thread() : \
_data(std::make_shared<Structure>())
{
create();
}
// 默认移动赋值运算符函数
Thread& Thread::operator=(Thread&& _thread)
Thread& Thread::operator=(Thread&& _another)
{
if (&_thread != this)
if (&_another != this)
{
std::scoped_lock lock(_mutex, _thread._mutex);
_data = std::move(_thread._data);
std::scoped_lock lock(this->_mutex, \
_another._mutex);
this->_data = std::move(_another._data);
}
return *this;
}
// 获取线程ID
Thread::ThreadID Thread::getID() const
auto Thread::getID() const -> ThreadID
{
auto data = load();
if (!data) return ThreadID();
......@@ -186,11 +203,12 @@ bool Thread::create()
if (data->getState() != State::EMPTY)
return false;
data->setState(State::INITIAL);
data->_condition.enter();
// 创建std::thread对象,以data为参数,执行函数execute
data->_thread = std::thread(execute, data);
data->setState(State::INITIAL);
return true;
}
......@@ -213,8 +231,8 @@ void Thread::destroy()
data->_thread.join();
// 清空配置项
data->_callback = nullptr;
data->_taskQueue = nullptr;
data->_callback = nullptr;
data->setState(State::EMPTY);
}
......@@ -222,7 +240,7 @@ void Thread::destroy()
bool Thread::configure(const QueueType& _taskQueue, \
const Callback& _callback)
{
// 无任务队列
// 任务队列无效
if (!_taskQueue) return false;
auto data = load();
......@@ -232,7 +250,7 @@ bool Thread::configure(const QueueType& _taskQueue, \
if (!idle()) return false;
data->_taskQueue = _taskQueue; // 配置任务队列,用于自动获取任务
data->_callback = _callback; // 配置回调函数子,执行一次任务,通知守护线程,传递线程闲置状态
data->_callback = _callback; // 配置回调函数子,执行一次任务,通知守护线程,传递线程闲置状态
data->setState(Structure::State::BLOCKED);
return true;
}
......@@ -252,7 +270,26 @@ bool Thread::configure(const TaskType& _task, \
data->setState(Structure::State::RUNNABLE);
data->_callback = _callback; // 配置回调函数子
data->setTask(_task); // 设置任务
data->setTask(_task); // 设置任务函数子
return true;
}
// 配置单任务与回调函数子
bool Thread::configure(TaskType&& _task, \
const Callback& _callback)
{
// 任务无效
if (!_task) return false;
auto data = load();
if (!data) return false;
std::lock_guard lock(data->_threadMutex);
if (!idle()) return false;
data->setState(Structure::State::RUNNABLE);
data->_callback = _callback; // 配置回调函数子
data->setTask(std::forward<TaskType>(_task)); // 设置任务函数子
return true;
}
......@@ -265,10 +302,10 @@ bool Thread::notify()
std::lock_guard lock(data->_threadMutex);
auto state = data->getState();
// 处于阻塞状态则获取任务
// 处于阻塞状态则获取任务
using State = Structure::State;
if (state == State::BLOCKED \
&& setTask(data))
&& getTask(data))
state = State::RUNNABLE;
// 非就绪状态不必通知
......
......@@ -3,26 +3,26 @@
* 语言标准:C++17
*
* 创建日期:2017年09月22日
* 更新日期:2022年03月19
* 更新日期:2023年01月14
*
* 摘要
* 1. 线程类Thread定义于此文件,实现于Thread.cpp。
* 2. Thread提供线程重用方案,支持销毁再创建,一次创建反复使用。
* 3. 线程在创建之后进入阻塞状态,先调用函数configure分配任务,再调用函数notify激活线程。
* 4. 可选配置单任务或者任务队列,以及回调函数子。
* 任务队列用于自动获取任务,回调函数子用于通知线程池,线程执行完单个任务,以及当前闲置状态。
* 5. 执行任务之时捕获异常,防止线程泄漏。
* 6. 线程执行任务之后,倘若配置有任务队列,主动获取任务,否则进入阻塞状态。
* 任务队列用于自动获取任务,回调函数子用于通知线程池已执行完单个任务,以及当前的闲置状态。
* 5. 执行任务之时捕获异常,防止线程泄漏。
* 6. 线程在执行任务之后,倘若配置有任务队列,则主动获取任务,否则进入阻塞状态。
* 倘若获取任务失败,等待分配任务;否则执行新任务,从而提高执行效率。
* 7. 线程在退出之前,倘若配置有任务队列,确保完成所有任务,否则仅执行配置任务。
* 7. 线程在退出之前,倘若配置有任务队列,则确保完成所有任务,否则仅执行配置的单个任务。
* 8. 以原子操作确保接口的线程安全性,以单状态枚举确保接口的执行顺序。
* 9. 引入条件类模板Condition,当激活先于阻塞之时,确保线程正常退出。
* 9. 引入强化条件类模板Condition,当激活先于阻塞之时,确保线程正常退出。
* 10.线程主函数声明为静态成员,除去与类成员指针this的关联性。
*
* 作者:许聪
* 邮箱:solifree@qq.com
*
* 版本:v2.1.0
* 版本:v2.2.0
* 变化
* v2.0.1
* 1.运用Condition的宽松策略,提升激活线程的效率。
......@@ -33,6 +33,10 @@
* 2.消除配置先于回调的隐患。
* v2.1.0
* 1.解决线程在销毁又创建之时的直接退出问题。
* v2.2.0
* 1.配置任务支持复制语义和移动语义。
* 2.解决线程在销毁又创建之时可能出现的状态错误问题。
* 3.判断获取的任务是否有效,以防止线程泄漏。
*/
#pragma once
......@@ -48,7 +52,7 @@
ETERFREE_SPACE_BEGIN
template <typename _Element>
class Queue;
class DoubleQueue;
/*
继承类模板enable_shared_from_this,当Thread被shared_ptr托管,而需要传递this给其它函数之时,
......@@ -56,17 +60,20 @@ class Queue;
不可直接传递裸指针this,否则无法确保shared_ptr的语义,也许会导致已被释放的错误。
不可单独创建另一shared_ptr,否则多个shared_ptr的控制块不同,导致释放多次同一对象。
*/
class Thread
//: public std::enable_shared_from_this<Thread>
class Thread //: public std::enable_shared_from_this<Thread>
{
// 线程数据结构体
struct Structure;
private:
using DataType = std::shared_ptr<Structure>;
public:
using TaskType = std::function<void()>;
using QueueType = std::shared_ptr<Queue<TaskType>>;
using QueueType = std::shared_ptr<DoubleQueue<TaskType>>;
using ThreadID = std::thread::id;
using Callback = std::function<void(bool, ThreadID)>;
using Callback = std::function<void(ThreadID, bool)>;
private:
mutable std::mutex _mutex;
......@@ -74,7 +81,7 @@ private:
private:
// 获取任务
static bool setTask(DataType& _data);
static bool getTask(DataType& _data);
// 线程主函数
static void execute(DataType _data);
......@@ -95,10 +102,10 @@ public:
Thread(const Thread&) = delete;
// 默认移动构造函数
Thread(Thread&& _thread)
Thread(Thread&& _another)
{
std::lock_guard lock(_thread._mutex);
_data = std::move(_thread._data);
std::lock_guard lock(_another._mutex);
this->_data = std::move(_another._data);
}
// 默认析构函数
......@@ -108,7 +115,7 @@ public:
Thread& operator=(const Thread&) = delete;
// 默认移动赋值运算符函数
Thread& operator=(Thread&& _thread);
Thread& operator=(Thread&& _another);
// 获取线程唯一标识
ThreadID getID() const;
......@@ -119,6 +126,7 @@ public:
{
return idle();
}
// 是否闲置
bool idle() const;
......@@ -136,12 +144,17 @@ public:
bool configure(const TaskType& _task, \
const Callback& _callback);
// 配置单任务与回调函数子
bool configure(TaskType&& _task, \
const Callback& _callback);
// 启动线程
REPLACEMENT(notify)
bool start()
{
return notify();
}
// 激活线程
bool notify();
};
......
#include "ThreadPool.h"
#include "Thread.h"
#include "Condition.hpp"
#include "Queue.hpp"
#include "DoubleQueue.hpp"
#include <cstdint>
#include <atomic>
#include <thread>
......@@ -16,8 +17,7 @@ SizeType functor(SizeType _size, Arithmetic _arithmetic) noexcept \
switch (_arithmetic) \
{ \
case Arithmetic::REPLACE: \
field.store(_size, MEMORY_ORDER); \
return _size; \
return field.exchange(_size, MEMORY_ORDER); \
case Arithmetic::INCREASE: \
return field.fetch_add(_size, MEMORY_ORDER); \
case Arithmetic::DECREASE: \
......@@ -31,47 +31,40 @@ SizeType functor(SizeType _size, Arithmetic _arithmetic) noexcept \
struct ThreadPool::Structure
{
// 算术枚举
enum class Arithmetic { REPLACE, INCREASE, DECREASE };
enum class Arithmetic : std::uint8_t
{
REPLACE, // 替换
INCREASE, // 自增
DECREASE // 自减
};
using Condition = Condition<>;
using QueueType = Queue<TaskType>;
using QueueType = DoubleQueue<TaskType>;
using Callback = Thread::Callback;
std::list<Thread> _threadTable; // 线程表
std::shared_ptr<QueueType> _taskQueue; // 任务队列
Callback _callback; // 回调函数子
std::thread _thread; // 守护线程
Condition _condition; // 强化条件变量
std::thread _thread; // 守护线程
std::list<Thread> _threadTable; // 线程表
std::atomic<SizeType> _capacity; // 线程池容量
std::atomic<SizeType> _size; // 线程数量
std::atomic<SizeType> _totalSize; // 总线程数量
std::atomic<SizeType> _idleSize; // 闲置线程数量
std::shared_ptr<QueueType> _taskQueue; // 任务队列
Callback _callback; // 回调函数子
// 过滤任务
template <typename _TaskQueue>
static auto filterTask(_TaskQueue& _taskQueue);
/*
* 默认构造函数
* 若先以运算符new创建实例,再交由共享指针std::shared_ptr托管,
* 则至少二次分配内存,先为实例分配内存,再为共享指针的控制块分配内存。
* 而std::make_shared典型地仅分配一次内存,实例内存和控制块内存连续。
*/
Structure()
: _taskQueue(std::make_shared<QueueType>()) {}
// 过滤任务
template <typename _TaskQueue>
static auto filterTask(_TaskQueue& _taskQueue);
// 放入任务
bool pushTask(const TaskType& _task);
bool pushTask(TaskType&& _task);
// 批量放入任务
bool pushTask(TaskQueue& _taskQueue);
bool pushTask(TaskQueue&& _taskQueue);
// 设置线程池容量
void setCapacity(SizeType _capacity, \
bool _notified = false);
Structure() : \
_taskQueue(std::make_shared<QueueType>()) {}
// 获取线程池容量
auto getCapacity() const noexcept
......@@ -79,23 +72,34 @@ struct ThreadPool::Structure
return _capacity.load(std::memory_order_relaxed);
}
// 设置线程
SET_ATOMIC(SizeType, Arithmetic, setSize, this->_size);
// 设置线程池容
void setCapacity(SizeType _capacity, bool _notified = false);
// 获取线程数量
auto getSize() const noexcept
// 获取线程数量
auto getTotalSize() const noexcept
{
return _size.load(std::memory_order_relaxed);
return _totalSize.load(std::memory_order_relaxed);
}
// 设置闲置线程数量
SET_ATOMIC(SizeType, Arithmetic, setIdleSize, _idleSize);
// 设置线程数量
SET_ATOMIC(SizeType, Arithmetic, setTotalSize, _totalSize);
// 获取闲置线程数量
auto getIdleSize() const noexcept
{
return _idleSize.load(std::memory_order_relaxed);
}
// 设置闲置线程数量
SET_ATOMIC(SizeType, Arithmetic, setIdleSize, _idleSize);
// 放入任务
bool pushTask(const TaskType& _task);
bool pushTask(TaskType&& _task);
// 批量放入任务
bool pushTask(TaskQueue& _taskQueue);
bool pushTask(TaskQueue&& _taskQueue);
};
#undef SET_ATOMIC
......@@ -117,7 +121,17 @@ auto ThreadPool::Structure::filterTask(_TaskQueue& _taskQueue)
return size;
}
// 放入单任务
// 设置线程池容量
void ThreadPool::Structure::setCapacity(SizeType _capacity, \
bool _notified)
{
auto capacity = this->_capacity.exchange(_capacity, \
std::memory_order_relaxed);
if (_notified && capacity != _capacity)
_condition.notify_one(Condition::Strategy::RELAXED);
}
// 放入任务
bool ThreadPool::Structure::pushTask(const TaskType& _task)
{
// 若放入任务之前,任务队列为空,则通知守护线程
......@@ -127,7 +141,7 @@ bool ThreadPool::Structure::pushTask(const TaskType& _task)
return result.has_value();
}
// 放入任务
// 放入任务
bool ThreadPool::Structure::pushTask(TaskType&& _task)
{
// 若放入任务之前,任务队列为空,则通知守护线程
......@@ -163,54 +177,50 @@ bool ThreadPool::Structure::pushTask(TaskQueue&& _taskQueue)
return result.has_value();
}
// 设置线程池容量
void ThreadPool::Structure::setCapacity(SizeType _capacity, \
bool _notified)
// 获取线程池容量
auto ThreadPool::Proxy::getCapacity() const noexcept \
-> SizeType
{
auto capacity = this->_capacity.exchange(_capacity, \
std::memory_order_relaxed);
if (_notified && capacity != _capacity)
_condition.notify_one(Condition::Strategy::RELAXED);
return _data ? _data->getCapacity() : 0;
}
// 设置线程池容量
void ThreadPool::Proxy::setCapacity(SizeType _capacity)
bool ThreadPool::Proxy::setCapacity(SizeType _capacity)
{
if (_capacity > 0 && _data)
_data->setCapacity(_capacity, true);
}
if (_capacity <= 0 || !_data) return false;
// 获取线程池容量
ThreadPool::SizeType ThreadPool::Proxy::getCapacity() const noexcept
{
return _data ? _data->getCapacity() : 0;
_data->setCapacity(_capacity, true);
return true;
}
// 获取线程数量
ThreadPool::SizeType ThreadPool::Proxy::getSize() const noexcept
// 获取总线程数量
auto ThreadPool::Proxy::getTotalSize() const noexcept \
-> SizeType
{
return _data ? _data->getSize() : 0;
return _data ? _data->getTotalSize() : 0;
}
// 获取闲置线程数量
ThreadPool::SizeType ThreadPool::Proxy::getIdleSize() const noexcept
auto ThreadPool::Proxy::getIdleSize() const noexcept \
-> SizeType
{
return _data ? _data->getIdleSize() : 0;
}
// 获取任务数量
ThreadPool::SizeType ThreadPool::Proxy::getTaskSize() const noexcept
auto ThreadPool::Proxy::getTaskSize() const noexcept \
-> SizeType
{
return _data ? _data->_taskQueue->size() : 0;
}
// 放入任务
// 放入任务
bool ThreadPool::Proxy::pushTask(const TaskType& _task)
{
return _task && _data && _data->pushTask(_task);
}
// 放入任务
// 放入任务
bool ThreadPool::Proxy::pushTask(TaskType&& _task)
{
return _task && _data \
......@@ -230,7 +240,7 @@ bool ThreadPool::Proxy::pushTask(TaskQueue&& _taskQueue)
&& _data->pushTask(std::forward<TaskQueue>(_taskQueue));
}
// 批量出任务
// 批量出任务
bool ThreadPool::Proxy::popTask(TaskQueue& _taskQueue)
{
return _data && _data->_taskQueue->pop(_taskQueue);
......@@ -249,8 +259,7 @@ void ThreadPool::create(DataType&& _data, SizeType _capacity)
using Arithmetic = Structure::Arithmetic;
// 定义回调函数子
_data->_callback = \
[_data = std::weak_ptr(_data)](bool _idle, Thread::ThreadID _id) \
_data->_callback = [_data = std::weak_ptr(_data)](Thread::ThreadID _id, bool _idle)
{
// 线程并非闲置状态
if (!_idle) return;
......@@ -271,7 +280,7 @@ void ThreadPool::create(DataType&& _data, SizeType _capacity)
}
_data->setCapacity(_capacity); // 设置线程池容量
_data->setSize(_capacity, Arithmetic::REPLACE); // 设置线程数量
_data->setTotalSize(_capacity, Arithmetic::REPLACE); // 设置总线程数量
_data->setIdleSize(_capacity, Arithmetic::REPLACE); // 设置闲置线程数量
// 创建std::thread对象,即守护线程,以_data为参数,执行函数execute
......@@ -296,14 +305,14 @@ void ThreadPool::destroy(DataType&& _data)
using Arithmetic = Structure::Arithmetic;
_data->setCapacity(0); // 设置线程池容量
_data->setSize(0, Arithmetic::REPLACE); // 设置线程数量
_data->setTotalSize(0, Arithmetic::REPLACE); // 设置总线程数量
_data->setIdleSize(0, Arithmetic::REPLACE); // 设置闲置线程数量
}
// 调整线程数量
ThreadPool::SizeType ThreadPool::adjust(DataType& _data)
{
auto size = _data->getSize();
auto size = _data->getTotalSize();
auto capacity = _data->getCapacity();
// 1.删减线程
......@@ -322,8 +331,8 @@ ThreadPool::SizeType ThreadPool::adjust(DataType& _data)
using Arithmetic = Structure::Arithmetic;
// 增加线程数量
_data->setSize(size, Arithmetic::INCREASE);
// 增加线程数量
_data->setTotalSize(size, Arithmetic::INCREASE);
// 增加闲置线程数量
_data->setIdleSize(size, Arithmetic::INCREASE);
......@@ -335,19 +344,19 @@ void ThreadPool::execute(DataType _data)
{
/*
* 条件变量的谓词,不必等待通知的条件
* 1.存在闲置线程并且任务队列非空
* 2.存在闲置线程并且需要删减线程。
* 1.强化条件变量无效
* 2.任务队列非空并且存在闲置线程。
* 3.任务队列非空并且需要增加线程。
* 4.条件无效
* 4.存在闲置线程并且需要删减线程
*/
auto predicate = [&_data] \
auto predicate = [&_data]
{
bool idle = _data->getIdleSize() > 0;
bool empty = _data->_taskQueue->empty();
auto size = _data->getSize();
bool idle = _data->getIdleSize() > 0;
auto size = _data->getTotalSize();
auto capacity = _data->getCapacity();
return idle && (!empty || size > capacity) \
|| !empty && size < capacity;
return !empty && (idle || size < capacity) \
|| idle && size > capacity;
};
// 若谓词非真,自动解锁互斥元,阻塞守护线程,直至通知激活,再次锁定互斥元
......@@ -377,7 +386,7 @@ void ThreadPool::execute(DataType _data)
{
iterator = _data->_threadTable.erase(iterator);
_data->setIdleSize(1, Arithmetic::DECREASE);
_data->setSize(1, Arithmetic::DECREASE);
_data->setTotalSize(1, Arithmetic::DECREASE);
--size;
continue;
}
......@@ -401,49 +410,55 @@ ThreadPool::SizeType ThreadPool::getConcurrency() noexcept
}
// 默认构造函数
ThreadPool::ThreadPool(SizeType _size, SizeType _capacity)
: _data(std::make_shared<Structure>())
ThreadPool::ThreadPool(SizeType _capacity) : \
_data(std::make_shared<Structure>())
{
create(load(), _capacity);
}
// 构造函数
ThreadPool::ThreadPool(SizeType _size, \
SizeType _capacity) : \
_data(std::make_shared<Structure>())
{
create(load(), _capacity);
}
// 默认移动赋值运算符函数
ThreadPool& ThreadPool::operator=(ThreadPool&& _threadPool)
ThreadPool& ThreadPool::operator=(ThreadPool&& _another)
{
if (&_threadPool != this)
if (&_another != this)
{
std::scoped_lock lock(_mutex, _threadPool._mutex);
_data = std::move(_threadPool._data);
std::scoped_lock lock(this->_mutex, _another._mutex);
this->_data = std::move(_another._data);
}
return *this;
}
// 获取代理
ThreadPool::Proxy ThreadPool::getProxy()
// 获取线程池容量
ThreadPool::SizeType ThreadPool::getCapacity() const
{
return load();
auto data = load();
return data ? data->getCapacity() : 0;
}
// 设置线程池容量
void ThreadPool::setCapacity(SizeType _capacity)
bool ThreadPool::setCapacity(SizeType _capacity)
{
if (_capacity > 0)
if (auto data = load())
{
data->setCapacity(_capacity, true);
return true;
}
return false;
}
// 获取线程池容量
ThreadPool::SizeType ThreadPool::getCapacity() const
{
auto data = load();
return data ? data->getCapacity() : 0;
}
// 获取线程数量
ThreadPool::SizeType ThreadPool::getSize() const
// 获取总线程数量
ThreadPool::SizeType ThreadPool::getTotalSize() const
{
auto data = load();
return data ? data->getSize() : 0;
return data ? data->getTotalSize() : 0;
}
// 获取闲置线程数量
......@@ -460,7 +475,7 @@ ThreadPool::SizeType ThreadPool::getTaskSize() const
return data ? data->_taskQueue->size() : 0;
}
// 放入任务
// 放入任务
bool ThreadPool::pushTask(const TaskType& _task)
{
// 过滤无效任务
......@@ -470,7 +485,7 @@ bool ThreadPool::pushTask(const TaskType& _task)
return data && data->pushTask(_task);
}
// 放入任务
// 放入任务
bool ThreadPool::pushTask(TaskType&& _task)
{
// 过滤无效任务
......@@ -496,7 +511,7 @@ bool ThreadPool::pushTask(TaskQueue&& _taskQueue)
&& data->pushTask(std::forward<TaskQueue>(_taskQueue));
}
// 批量出任务
// 批量出任务
bool ThreadPool::popTask(TaskQueue& _taskQueue)
{
auto data = load();
......@@ -510,4 +525,10 @@ void ThreadPool::clearTask()
data->_taskQueue->clear();
}
// 获取代理
ThreadPool::Proxy ThreadPool::getProxy()
{
return load();
}
ETERFREE_SPACE_END
......@@ -3,28 +3,28 @@
* 语言标准:C++17
*
* 创建日期:2017年09月22日
* 更新日期:2022年03月17
* 更新日期:2023年01月20
*
* 摘要
* 1.线程池类ThreadPool定义于此文件,实现于ThreadPool.cpp。
* 2.当任务队列为空,阻塞守护线程,在新增任务之时,激活守护线程,通知线程获取任务。
* 3.当无闲置线程,阻塞守护线程,当存在闲置线程,激活守护线程,通知闲置线程获取任务。
* 4.当销毁线程池,等待守护线程退出,而守护线程在退出之前,等待所有线程退出。
* 线程在退出之前,默认完成任务队列的所有任务,可选弹出所有任务或者清空队列,用以支持线程立即退出。
* 2.当无任务时,阻塞守护线程;当新增任务时,激活守护线程,通知线程获取任务。
* 3.当无闲置线程时,阻塞守护线程;当存在闲置线程时,激活守护线程,通知闲置线程获取任务。
* 4.当销毁线程池,等待守护线程退出,而守护线程在退出之前,等待所有线程退出。
* 线程在退出之前,默认执行任务队列的所有任务。可选取出所有任务或者清空队列,以支持线程立即退出。
* 5.提供增删线程策略,由守护线程增删线程。
* 在任务队列非空之时,一次性增加线程,当存在闲置线程之时,逐个删减线程。
* 在任务队列非空之时,一次性增加线程;在存在闲置线程之时,逐个删减线程。
* 6.以原子操作确保接口的线程安全性,并且新增成员类Proxy,用于减少原子操作,针对频繁操作提升性能。
* 7.引入双缓冲队列类模板Queue,降低读写任务的相互影响,提高放入和取出任务的效率
* 8.引入条件类模板Condition,当激活先于阻塞之时,确保守护线程正常退出。
* 9.守护线程主函数声明为静态成员,除去与类成员指针this的关联性
* 7.守护线程主函数声明为静态成员,除去与类成员指针this的关联性
* 8.引入强化条件类模板Condition,当激活先于阻塞时,确保守护线程正常退出。
* 9.引入双缓冲队列类模板DoubleQueue,提高放入和取出任务的效率
*
* 作者:许聪
* 邮箱:solifree@qq.com
*
* 版本:v2.1.0
* 版本:v2.2.0
* 变化
* v2.0.1
* 1.运用Condition的宽松策略,提升激活守护线程的效率
* 1.运用Condition的宽松策略,提升激活守护线程的性能
* v2.0.2
* 1.消除谓词对条件实例有效性的重复判断。
* v2.0.3
......@@ -36,9 +36,12 @@
* 3.新增任务可选复制语义或者移动语义。
* v2.1.0
* 1.修复线程池扩容问题。
* 由于未增加线程数量,因此无限创建线程;同时未增加闲置线程数量,守护线程无法调度新线程执行任务,即线程泄漏。
* 由于未增加线程数量,因此无限创建线程;同时未增加闲置线程数量,守护线程无法调度新线程执行任务,即线程泄漏。
* 2.修复线程池缩容问题。
* 在延迟删减线程之时,未减少线程数量,导致线程池反复删减线程,直至线程池为空。
* 在延迟删减线程之时,未减少总线程数量,导致线程池反复删减线程,直至线程池为空。
* v2.2.0
* 1.完善代码风格。
* 2.设置线程池容量函数返回合理值。
*/
#pragma once
......@@ -55,12 +58,17 @@ ETERFREE_SPACE_BEGIN
class ThreadPool
{
// 线程池数据结构体
struct Structure;
using DataType = std::shared_ptr<Structure>;
public:
// 线程池代理类
class Proxy;
private:
using DataType = std::shared_ptr<Structure>;
public:
using TaskType = std::function<void()>;
using TaskQueue = std::list<TaskType>;
using SizeType = TaskQueue::size_type;
......@@ -97,17 +105,20 @@ private:
public:
// 默认构造函数
ThreadPool(SizeType _size = getConcurrency(), \
SizeType _capacity = getConcurrency());
ThreadPool(SizeType _capacity = getConcurrency());
// 构造函数
DEPRECATED
ThreadPool(SizeType _size, SizeType _capacity);
// 删除默认复制构造函数
ThreadPool(const ThreadPool&) = delete;
// 默认移动构造函数
ThreadPool(ThreadPool&& _threadPool)
ThreadPool(ThreadPool&& _another)
{
std::lock_guard lock(_threadPool._mutex);
_data = std::move(_threadPool._data);
std::lock_guard lock(_another._mutex);
this->_data = std::move(_another._data);
}
// 默认析构函数
......@@ -122,20 +133,7 @@ public:
ThreadPool& operator=(const ThreadPool&) = delete;
// 默认移动赋值运算符函数
ThreadPool& operator=(ThreadPool&& _threadPool);
// 获取代理
Proxy getProxy();
// 设置线程最大数量
REPLACEMENT(setCapacity)
void setMaxThreads(SizeType _capacity)
{
setCapacity(_capacity);
}
// 设置线程池容量
void setCapacity(SizeType _capacity);
ThreadPool& operator=(ThreadPool&& _another);
// 获取线程最大数量
REPLACEMENT(getCapacity)
......@@ -147,22 +145,39 @@ public:
// 获取线程池容量
SizeType getCapacity() const;
// 设置线程数量
DEPRECATED
bool setThreads(SizeType size)
// 设置线程最大数量
REPLACEMENT(setCapacity)
bool setMaxThreads(SizeType _capacity)
{
return false;
return setCapacity(_capacity);
}
// 设置线程池容量
bool setCapacity(SizeType _capacity);
// 获取线程数量
REPLACEMENT(getSize)
REPLACEMENT(getTotalSize)
auto getThreads() const
{
return getSize();
return getTotalSize();
}
// 获取线程数量
SizeType getSize() const;
REPLACEMENT(getTotalSize)
auto getSize() const
{
return getTotalSize();
}
// 获取总线程数量
SizeType getTotalSize() const;
// 设置线程数量
DEPRECATED
bool setThreads(SizeType size)
{
return false;
}
// 获取空闲线程数量
REPLACEMENT(getIdleSize)
......@@ -200,22 +215,20 @@ public:
return pushTask(TaskType(std::forward<_Functor>(_functor)));
}
template <typename _Functor, typename... _Args>
bool pushTask(_Functor&& _functor, _Args&&... _args)
{
auto functor = std::bind(std::forward<_Functor>(_functor), \
std::forward<_Args>(_args)...);
return pushTask(TaskType(functor));
}
bool pushTask(_Functor&& _functor, _Args&&... _args);
// 批量放入任务
bool pushTask(TaskQueue& _taskQueue);
bool pushTask(TaskQueue&& _taskQueue);
// 批量出任务
// 批量出任务
bool popTask(TaskQueue& _taskQueue);
// 清空任务
void clearTask();
// 获取代理
Proxy getProxy();
};
class ThreadPool::Proxy
......@@ -223,22 +236,29 @@ class ThreadPool::Proxy
DataType _data;
public:
Proxy(const decltype(_data)& _data) noexcept
: _data(_data) {}
Proxy(const decltype(_data)& _data) noexcept : \
_data(_data) {}
explicit operator bool() const noexcept
{
return static_cast<bool>(_data);
}
// 设置线程池容量
void setCapacity(SizeType _capacity);
// 获取线程池容量
SizeType getCapacity() const noexcept;
// 设置线程池容量
bool setCapacity(SizeType _capacity);
// 获取线程数量
SizeType getSize() const noexcept;
REPLACEMENT(getTotalSize)
auto getSize() const noexcept
{
return getTotalSize();
}
// 获取总线程数量
SizeType getTotalSize() const noexcept;
// 获取闲置线程数量
SizeType getIdleSize() const noexcept;
......@@ -262,22 +282,33 @@ public:
return pushTask(TaskType(std::forward<_Functor>(_functor)));
}
template <typename _Functor, typename... _Args>
bool pushTask(_Functor&& _functor, _Args&&... _args)
{
auto functor = std::bind(std::forward<_Functor>(_functor), \
std::forward<_Args>(_args)...);
return pushTask(TaskType(functor));
}
bool pushTask(_Functor&& _functor, _Args&&... _args);
// 批量放入任务
bool pushTask(TaskQueue& _taskQueue);
bool pushTask(TaskQueue&& _taskQueue);
// 批量出任务
// 批量出任务
bool popTask(TaskQueue& _taskQueue);
// 清空任务
void clearTask();
};
template <typename _Functor, typename... _Args>
bool ThreadPool::Proxy::pushTask(_Functor&& _functor, _Args&&... _args)
{
auto functor = std::bind(std::forward<_Functor>(_functor), \
std::forward<_Args>(_args)...);
return pushTask(TaskType(functor));
}
template <typename _Functor, typename... _Args>
bool ThreadPool::pushTask(_Functor&& _functor, _Args&&... _args)
{
auto functor = std::bind(std::forward<_Functor>(_functor), \
std::forward<_Args>(_args)...);
return pushTask(TaskType(functor));
}
ETERFREE_SPACE_END
......@@ -45,21 +45,22 @@ static void execute(ThreadPool& _threadPool)
ThreadPool::TaskQueue taskQueue;
for (auto index = 0UL; index < 30000UL; ++index)
taskQueue.push_back(task);
_threadPool.pushTask(std::move(taskQueue));
proxy.pushTask(std::move(taskQueue));
}
static void terminate(ThreadPool&& _threadPool)
{
_threadPool.clearTask();
auto threadPool(std::move(_threadPool));
auto threadPool(std::forward(_threadPool));
(void)threadPool;
}
#elif defined BOOST
static auto getConcurrency() noexcept
{
auto concurrency = std::thread::hardware_concurrency();
return concurrency > 0 ? concurrency \
: static_cast<decltype(concurrency)>(1);
return concurrency > 0 ? \
concurrency : static_cast<decltype(concurrency)>(1);
}
using ThreadPool = boost::threadpool::thread_pool<>;
......@@ -75,7 +76,8 @@ static void execute(ThreadPool& _threadPool)
static void terminate(ThreadPool&& _threadPool)
{
auto threadPool(std::move(_threadPool));
auto threadPool(std::forward(_threadPool));
(void)threadPool;
}
#endif
......@@ -83,24 +85,24 @@ int main()
{
using std::cout, std::endl;
constexpr auto load = []() noexcept \
constexpr auto load = []() noexcept
{ return counter.load(std::memory_order_relaxed); };
#ifdef FILE_STREAM
constexpr auto file = "ThreadPool.log";
constexpr auto FILE = "ThreadPool.log";
#ifdef FILE_SYSTEM
std::filesystem::remove(file);
std::filesystem::remove(FILE);
#endif
std::ofstream ofs(file, std::ios::app);
std::ofstream ofs(FILE, std::ios::app);
auto os = cout.rdbuf(ofs.rdbuf());
#endif
#if defined ETERFREE
eterfree::ThreadPool threadPool;
ThreadPool threadPool;
#elif defined BOOST
boost::threadpool::thread_pool threadPool(getConcurrency());
ThreadPool threadPool(getConcurrency());
#endif
using namespace std::chrono;
......@@ -118,7 +120,7 @@ int main()
#ifdef FILE_STREAM
cout << endl;
std::cout.rdbuf(os);
cout.rdbuf(os);
#endif
terminate(std::move(threadPool));
......
......@@ -21,7 +21,7 @@ static void task()
static void print(const ThreadPool& _threadPool)
{
std::cout << _threadPool.getCapacity() << ' ' \
<< _threadPool.getSize() << ' ' \
<< _threadPool.getTotalSize() << ' ' \
<< _threadPool.getIdleSize() << ' ' \
<< _threadPool.getTaskSize() << std::endl;
}
......@@ -33,20 +33,20 @@ int main()
auto capacity = proxy.getCapacity();
for (decltype(capacity) index = 0; \
index < capacity; ++index)
threadPool.pushTask(task);
proxy.pushTask(task);
using namespace std::this_thread;
using namespace std::chrono;
sleep_for(seconds(2));
print(threadPool);
threadPool.pushTask([] \
proxy.pushTask([] \
{ std::cout << "eterfree::ThreadPool" << std::endl; });
sleep_for(seconds(1));
print(threadPool);
threadPool.setCapacity(capacity + 1);
proxy.setCapacity(capacity + 1);
sleep_for(seconds(2));
print(threadPool);
......
......@@ -3,25 +3,21 @@
#include <cstdlib>
#include <iostream>
USING_ETERFREE_SPACE
int main()
{
using std::cout, std::boolalpha, std::endl;
Thread thread;
eterfree::Thread thread;
cout << thread.getID() << endl;
thread.configure([] \
{ cout << "Eterfree" << endl; }, nullptr);
thread.configure([] { cout << "Eterfree" << endl; }, nullptr);
cout << boolalpha << thread.notify() << endl;
thread.destroy();
thread.create();
cout << thread.getID() << endl;
thread.configure([] \
{ cout << "solifree" << endl; }, nullptr);
thread.configure([] { cout << "solifree" << endl; }, nullptr);
cout << boolalpha << thread.notify() << endl;
return EXIT_SUCCESS;
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册