Thread

class Thread

class paddle::Thread

A simple wrapper for std::thread

Subclassed by paddle::SocketServer, paddle::SocketWorker, paddle::ThreadWorker

Public Functions

Thread()

Construct Function. Default thread pointer is null.

virtual ~Thread()
void start()

Creat a new thread and call run() function.

void detach()

Detach the thread. It don’t need to be waited until it finish.

void join()

Join the thread. It should be waited until it finish.

virtual void run() = 0

Define what to be done on this thread through override this function.

Protected Attributes

std::unique_ptr<std::thread> thread_

class ThreadWorker

class paddle::ThreadWorker

ThreadWorker maintains a job queue. It executes the jobs in the job queue sequentianlly in a separate thread.

Use addJob() to add a new job to the job queue.

Inherits from paddle::Thread

Public Types

typedef std::function<void()> JobFunc

Public Functions

ThreadWorker()

Construct Function. Default size of job queue is 0 and not stopping.

~ThreadWorker()

Destruct Function. If it’s running, wait until all job finish and then stop it.

void stop()

Finish current running job and quit the thread.

void addJob(JobFunc func)

Add a new job to the job queue.

void wait()

Wait until all jobs was done (the job queue was empty).

Protected Functions

virtual void run()

Execute jobs in the job queue sequentianlly,.

Note
If finish all the jobs in the job queue, notifies all the waiting threads the job queue was empty.

Protected Attributes

Queue<JobFunc> jobs_
bool stopping_
LockedCondition finishCV_
bool empty_

class SyncThreadPool

class paddle::SyncThreadPool

SyncThreadPool maintains a pool of threads. It executes the job use all workers in the pool.

Use exec() to run a new job, job complete when exec returned. Only one job can exec simultaneously.

Each worker has an tid whose range is [0, getNumThreads()). JobFunc can use tid to divide input data.

Public Types

typedef std::function<void(int tid, size_t numThreads)> JobFunc

Public Functions

SyncThreadPool()

Construct Function. No thread will be created.

SyncThreadPool(size_t numWorkers, bool checkOwner = true)

Construct Fucntion. Create numWorkers of threads in the pool.

Parameters
  • numWorkers: Number of the workers in the pool.
  • checkOwner: Default true. If checkOwner is true, this sync thread pool should be used by it’s owner thread.

~SyncThreadPool()
size_t getNumThreads()

Return num of threads in the pool.

void exec(JobFunc jobFunc, JobFunc ownerFunc = nullptr)

Execute a job using all the theads in the pool.

Note
For the ownerFunc, tid=getNumThreads().
Parameters
  • jobFunc: The function to be executed.
  • ownerFunc: Owner thread can do something in owerFunc when job executing.

void execPlusOwner(JobFunc jobFunc)

Execute a job using all the threads in the pool. And the owner thread will do the same job.

Note
Assume that JobFunc will execute numThread + 1 times, with tid ranging [0,numThread]. The thread whose tid is numThread is the owner thread.
Parameters
  • jobFunc: The job to be executed.

Public Static Functions

static void execHelper(SyncThreadPool *pool, JobFunc jobFunc)

Execute a job if has pool, else use caller thread as a worker.

Parameters
  • pool: The pool to execute the job.
  • jobFunc: The job to be excuted.

Protected Functions

void start()

Start all the workers in the pool, call their run() function.

void stop()

Stop all the workers in the pool.

void run(int tid)

Execute the jobFunc_ using the worker thread tid, if not stopping.

Protected Attributes

pid_t ownerThreadId_
bool stopping_
ThreadBarrier jobStartBarrier_
ThreadBarrier jobFinishBarrier_
JobFunc jobFunc_
bool checkOwner_
std::vector<std::unique_ptr<std::thread>> workers_

class MultiThreadWorker

template <class T>
class paddle::MultiThreadWorker

MultiThreadWorker maintains a job queue and a result queue. It executes the jobs in the job queue and puts the results into the result queue sequentially in multi separate threads.

Add jobs:

Use addJob() to add a new job to the job queue (the user added jobs should not return nullptr).

Use stopAddJob() to stop adding new jobs to the job queue (addJob() can not be called after stopAddJob()).

Normal stop:

Use waitResult() to get the results until nullptr is returned. Use stop() to exit normally (stopAddJob() should be called first).

Force stop:

Use forceStop() to exit forcibly even though there are remaining jobs in the job queue.

Public Types

typedef T ResultType
typedef std::shared_ptr<ResultType> ResultPtrType
typedef std::function<ResultPtrType()> JobFunc

Public Functions

MultiThreadWorker(size_t workerNum, size_t queueCapacity)

Construct Function. Initialize the multithread worker.

Parameters
  • workerNum: Number of the workers.
  • queueCapacity: Capapcity of the result queue.

virtual ~MultiThreadWorker()

Destruct Function. Force stop the workers even though there are remaining jobs in the job queue.

void stop()

Stop all the workers normally.

Note
stopAddJob() should be called before it.

void forceStop()

Stop all the workers forcibly.

Note
This function will call stopAddJob() first and empty the result queue.

void addJob(JobFunc func)

Add a job to the job queue.

Note
Job can not be added after calling stopAddJob().

void stopAddJob()

Stop adding new jobs to the job queue.

Note
This fuction enqueue a return nullptr function to the job queue.

ResultPtrType waitResult()

Dequeue the first result in the result queue and return it.

Note
If the result queue is empty, wait until it’s not empty or return nullptr if all the results have been returned.

bool testResult()

The result queue is empty or not.

Return
true if empty.

Protected Functions

virtual void run()

Do the jobs in the job queue sequentianlly and enqueue the result into the result queue.

Note
A nullptr will be enqueued into the resulte queue, when a worker finished.

Protected Attributes

bool stopping_
bool jobAdding_
size_t nullResultNum_
Queue<JobFunc> jobs_
BlockingQueue<ResultPtrType> results_
std::vector<std::unique_ptr<std::thread>> workers_

class AsyncThreadPool

class paddle::AsyncThreadPool

AsyncThreadPool maintains a job queue and threads pool. It executes the jobs from queue asynchronously.

Add jobs:

Use addJob() to add a new job to the job queue and get a std::future result. The caller’s thread continues running. Call std::future::get() when the result’s value is needed, and the caller’s thread may be blocked until thread-pool finished the job.

Use addBatchJobs() to add a batch of jobs. Unlike addJob()‘s asynchronization, addBatchJobs will block caller’s thread until all jobs in the batch are finished.

Stop: Use stop() to stop the thread pool. Job can be added once stopped.

Process-wide Singleton: Use AsyncThreadPool::ProcessChannel(N) first to create N threads. Then call AsyncThreadPool::ProcessChannel() to get the process-wide global thread pool.

Public Types

typedef std::function<void()> JobFunc

Public Functions

AsyncThreadPool()
AsyncThreadPool(size_t threadNum)

Construct Function. Install all the workers.

Parameters
  • threadNum: Number of the threads, must greater than 1.

~AsyncThreadPool()
void stop()

Stop all the workers normally.

template <class F, class... Args>
auto addJob(F &&f, Args&&... args)

Add a job to queue and return a std::future.

Note
The job will be executed asynchronously. Call std::future::get() when the execturation result is needed;

template <class F>
void addBatchJobs(const std::vector<F> &jobs, std::vector<typename std::result_of<F()>::type> &results)

Add a batch of jobs to the queue. The main thread will be blocked until these jobs are finished. The results will be stored in results according to jobs order.

Note
results may need to be carefully cleared before addBatchJobs().
Template Parameters
  • F: should have a return value.
Parameters
  • jobs: a vector of executable objection.
  • results: a vector to store the results.

template <class F>
void addBatchJobs(const std::vector<F> &jobs)

Add a batch of jobs reguardless of its result.

Template Parameters
  • F: don’t need to have a return value.
Parameters
  • jobs: a vector of executable objection.

Public Static Functions

static AsyncThreadPool &ProcessChannel(size_t initThreadNum = 0)

A process-wide singleton. Used as a global thread pool It should be initialized by calling AsyncThreadPool::ProcessChannel(N) first to create N threads, then call AsyncThreadPool::ProcessChannel() will get the thread pool.

Protected Functions

void run()

Execute the jobs in the job queue.