提交 c2d8faab 编写于 作者: Y YuQing

thread pool enhance

上级 4b085fbc
Version 1.44 2020-07-20 Version 1.44 2020-08-05
* add test file src/tests/test_pthread_lock.c * add test file src/tests/test_pthread_lock.c
* add uniq_skiplist.[hc] * add uniq_skiplist.[hc]
* add function split_string_ex * add function split_string_ex
......
...@@ -2989,17 +2989,17 @@ int fc_delete_file_ex(const char *filename, const char *caption) ...@@ -2989,17 +2989,17 @@ int fc_delete_file_ex(const char *filename, const char *caption)
return result; return result;
} }
bool fc_is_prime(const int n) bool fc_is_prime(const int64_t n)
{ {
int loop; int64_t loop;
int i; int64_t i;
if (n <= 0) if (n <= 0)
{ {
return false; return false;
} }
loop = lround(sqrt((double)n)); loop = llround(sqrt((double)n));
for (i=2; i<=loop; i++) for (i=2; i<=loop; i++)
{ {
if (n % i == 0) if (n % i == 0)
...@@ -3011,10 +3011,10 @@ bool fc_is_prime(const int n) ...@@ -3011,10 +3011,10 @@ bool fc_is_prime(const int n)
return true; return true;
} }
int fc_floor_prime(const int n) int64_t fc_floor_prime(const int64_t n)
{ {
int start; int64_t start;
int i; int64_t i;
start = (n % 2 == 0 ? n - 1 : n); start = (n % 2 == 0 ? n - 1 : n);
for (i = start; i > 0; i -= 2) for (i = start; i > 0; i -= 2)
...@@ -3028,9 +3028,9 @@ int fc_floor_prime(const int n) ...@@ -3028,9 +3028,9 @@ int fc_floor_prime(const int n)
return 1; return 1;
} }
int fc_ceil_prime(const int n) int64_t fc_ceil_prime(const int64_t n)
{ {
int i; int64_t i;
if (n <= 0) if (n <= 0)
{ {
......
...@@ -921,7 +921,7 @@ static inline int fc_delete_file(const char *filename) ...@@ -921,7 +921,7 @@ static inline int fc_delete_file(const char *filename)
* n: the number to detect * n: the number to detect
* return: true for prime number, otherwise false * return: true for prime number, otherwise false
*/ */
bool fc_is_prime(const int n); bool fc_is_prime(const int64_t n);
/** find the largest prime number not greater than n /** find the largest prime number not greater than n
...@@ -929,14 +929,14 @@ bool fc_is_prime(const int n); ...@@ -929,14 +929,14 @@ bool fc_is_prime(const int n);
* n: the number to detect * n: the number to detect
* return: the largest prime number near n * return: the largest prime number near n
*/ */
int fc_floor_prime(const int n); int64_t fc_floor_prime(const int64_t n);
/** find the smallest prime number not less than n /** find the smallest prime number not less than n
* parameters: * parameters:
* n: the number to detect * n: the number to detect
* return: the smallest prime number near n * return: the smallest prime number near n
*/ */
int fc_ceil_prime(const int n); int64_t fc_ceil_prime(const int64_t n);
/** init buffer /** init buffer
* parameters: * parameters:
......
...@@ -21,14 +21,18 @@ static void *thread_entrance(void *arg) ...@@ -21,14 +21,18 @@ static void *thread_entrance(void *arg)
thread = (FCThreadInfo *)arg; thread = (FCThreadInfo *)arg;
pool = thread->pool; pool = thread->pool;
if (pool->extra_data_callbacks.alloc != NULL) {
thread->tdata = pool->extra_data_callbacks.alloc();
}
PTHREAD_MUTEX_LOCK(&thread->lock); PTHREAD_MUTEX_LOCK(&thread->lock);
thread->inited = true; thread->inited = true;
PTHREAD_MUTEX_UNLOCK(&thread->lock); PTHREAD_MUTEX_UNLOCK(&thread->lock);
PTHREAD_MUTEX_LOCK(&pool->lock); PTHREAD_MUTEX_LOCK(&pool->lock);
pool->thread_counts.running++; pool->thread_counts.running++;
logInfo("tindex: %d start, tcount: %d", logInfo("thread pool: %s, index: %d start, running count: %d",
thread->index, pool->thread_counts.running); pool->name, thread->index, pool->thread_counts.running);
PTHREAD_MUTEX_UNLOCK(&pool->lock); PTHREAD_MUTEX_UNLOCK(&pool->lock);
running = true; running = true;
...@@ -37,12 +41,12 @@ static void *thread_entrance(void *arg) ...@@ -37,12 +41,12 @@ static void *thread_entrance(void *arg)
while (running && *pool->pcontinue_flag) { while (running && *pool->pcontinue_flag) {
PTHREAD_MUTEX_LOCK(&thread->lock); PTHREAD_MUTEX_LOCK(&thread->lock);
if (thread->func == NULL) { if (thread->callback.func == NULL) {
ts.tv_sec = get_current_time() + 2; ts.tv_sec = get_current_time() + 2;
pthread_cond_timedwait(&thread->cond, &thread->lock, &ts); pthread_cond_timedwait(&thread->cond, &thread->lock, &ts);
} }
callback = thread->func; callback = thread->callback.func;
if (callback == NULL) { if (callback == NULL) {
if (pool->max_idle_time > 0 && get_current_time() - if (pool->max_idle_time > 0 && get_current_time() -
last_run_time > pool->max_idle_time) last_run_time > pool->max_idle_time)
...@@ -59,13 +63,13 @@ static void *thread_entrance(void *arg) ...@@ -59,13 +63,13 @@ static void *thread_entrance(void *arg)
PTHREAD_MUTEX_UNLOCK(&pool->lock); PTHREAD_MUTEX_UNLOCK(&pool->lock);
} }
} else { } else {
thread->func = NULL; thread->callback.func = NULL;
} }
PTHREAD_MUTEX_UNLOCK(&thread->lock); PTHREAD_MUTEX_UNLOCK(&thread->lock);
if (callback != NULL) { if (callback != NULL) {
__sync_add_and_fetch(&pool->thread_counts.dealing, 1); __sync_add_and_fetch(&pool->thread_counts.dealing, 1);
callback(thread->arg); callback(thread->callback.arg, thread->tdata);
last_run_time = get_current_time(); last_run_time = get_current_time();
__sync_sub_and_fetch(&pool->thread_counts.dealing, 1); __sync_sub_and_fetch(&pool->thread_counts.dealing, 1);
...@@ -80,6 +84,11 @@ static void *thread_entrance(void *arg) ...@@ -80,6 +84,11 @@ static void *thread_entrance(void *arg)
} }
} }
if (pool->extra_data_callbacks.free != NULL && thread->tdata != NULL) {
pool->extra_data_callbacks.free(thread->tdata);
thread->tdata = NULL;
}
if (running) { if (running) {
PTHREAD_MUTEX_LOCK(&thread->lock); PTHREAD_MUTEX_LOCK(&thread->lock);
thread->inited = false; thread->inited = false;
...@@ -91,8 +100,8 @@ static void *thread_entrance(void *arg) ...@@ -91,8 +100,8 @@ static void *thread_entrance(void *arg)
} }
PTHREAD_MUTEX_LOCK(&pool->lock); PTHREAD_MUTEX_LOCK(&pool->lock);
logInfo("tindex: %d exit, tcount: %d", logInfo("thread pool: %s, index: %d exit, running count: %d",
thread->index, pool->thread_counts.running); pool->name, thread->index, pool->thread_counts.running);
PTHREAD_MUTEX_UNLOCK(&pool->lock); PTHREAD_MUTEX_UNLOCK(&pool->lock);
return NULL; return NULL;
...@@ -166,9 +175,10 @@ static int thread_pool_alloc_init(FCThreadPool *pool) ...@@ -166,9 +175,10 @@ static int thread_pool_alloc_init(FCThreadPool *pool)
return 0; return 0;
} }
int fc_thread_pool_init(FCThreadPool *pool, const int limit, int fc_thread_pool_init_ex(FCThreadPool *pool, const char *name,
const int stack_size, const int max_idle_time, const int limit, const int stack_size, const int max_idle_time,
const int min_idle_count, bool * volatile pcontinue_flag) const int min_idle_count, bool * volatile pcontinue_flag,
FCThreadExtraDataCallbacks *extra_data_callbacks)
{ {
int result; int result;
...@@ -176,6 +186,7 @@ int fc_thread_pool_init(FCThreadPool *pool, const int limit, ...@@ -176,6 +186,7 @@ int fc_thread_pool_init(FCThreadPool *pool, const int limit,
return result; return result;
} }
snprintf(pool->name, sizeof(pool->name), "%s", name);
pool->stack_size = stack_size; pool->stack_size = stack_size;
pool->max_idle_time = max_idle_time; pool->max_idle_time = max_idle_time;
if (min_idle_count > limit) { if (min_idle_count > limit) {
...@@ -187,6 +198,12 @@ int fc_thread_pool_init(FCThreadPool *pool, const int limit, ...@@ -187,6 +198,12 @@ int fc_thread_pool_init(FCThreadPool *pool, const int limit,
pool->thread_counts.running = 0; pool->thread_counts.running = 0;
pool->thread_counts.dealing = 0; pool->thread_counts.dealing = 0;
pool->pcontinue_flag = pcontinue_flag; pool->pcontinue_flag = pcontinue_flag;
if (extra_data_callbacks != NULL) {
pool->extra_data_callbacks = *extra_data_callbacks;
} else {
pool->extra_data_callbacks.alloc = NULL;
pool->extra_data_callbacks.free = NULL;
}
return thread_pool_alloc_init(pool); return thread_pool_alloc_init(pool);
} }
...@@ -223,8 +240,8 @@ int fc_thread_pool_run(FCThreadPool *pool, fc_thread_pool_callback func, ...@@ -223,8 +240,8 @@ int fc_thread_pool_run(FCThreadPool *pool, fc_thread_pool_callback func,
} }
PTHREAD_MUTEX_LOCK(&thread->lock); PTHREAD_MUTEX_LOCK(&thread->lock);
thread->func = func; thread->callback.func = func;
thread->arg = arg; thread->callback.arg = arg;
if (!thread->inited) { if (!thread->inited) {
result = fc_create_thread(&thread->tid, thread_entrance, result = fc_create_thread(&thread->tid, thread_entrance,
thread, pool->stack_size); thread, pool->stack_size);
......
...@@ -6,7 +6,15 @@ ...@@ -6,7 +6,15 @@
#include "fast_mblock.h" #include "fast_mblock.h"
#include "pthread_func.h" #include "pthread_func.h"
typedef void (*fc_thread_pool_callback)(void *arg); typedef void (*fc_thread_pool_callback)(void *arg, void *thread_data);
typedef void* (*fc_alloc_thread_extra_data_callback)();
typedef void (*fc_free_thread_extra_data_callback)(void *ptr);
typedef struct fc_thread_extra_data_callbacks
{
fc_alloc_thread_extra_data_callback alloc;
fc_free_thread_extra_data_callback free;
} FCThreadExtraDataCallbacks;
struct fc_thread_pool; struct fc_thread_pool;
typedef struct fc_thread_info typedef struct fc_thread_info
...@@ -16,14 +24,18 @@ typedef struct fc_thread_info ...@@ -16,14 +24,18 @@ typedef struct fc_thread_info
pthread_t tid; pthread_t tid;
pthread_mutex_t lock; pthread_mutex_t lock;
pthread_cond_t cond; pthread_cond_t cond;
fc_thread_pool_callback func; void *tdata; //thread data defined by the caller
void *arg; struct {
fc_thread_pool_callback func;
void *arg;
} callback;
struct fc_thread_pool *pool; struct fc_thread_pool *pool;
struct fc_thread_info *next; struct fc_thread_info *next;
} FCThreadInfo; } FCThreadInfo;
typedef struct fc_thread_pool typedef struct fc_thread_pool
{ {
char name[64];
FCThreadInfo *threads; //all thread info FCThreadInfo *threads; //all thread info
FCThreadInfo *freelist; FCThreadInfo *freelist;
pthread_mutex_t lock; pthread_mutex_t lock;
...@@ -38,15 +50,22 @@ typedef struct fc_thread_pool ...@@ -38,15 +50,22 @@ typedef struct fc_thread_pool
volatile int dealing; //dealing task thread count volatile int dealing; //dealing task thread count
} thread_counts; } thread_counts;
bool * volatile pcontinue_flag; bool * volatile pcontinue_flag;
FCThreadExtraDataCallbacks extra_data_callbacks;
} FCThreadPool; } FCThreadPool;
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
int fc_thread_pool_init(FCThreadPool *pool, const int limit, #define fc_thread_pool_init(pool, name, limit, stack_size, max_idle_time, \
const int stack_size, const int max_idle_time, min_idle_count, pcontinue_flag) \
const int min_idle_count, bool * volatile pcontinue_flag); fc_thread_pool_init_ex(pool, name, limit, stack_size, max_idle_time, \
min_idle_count, pcontinue_flag, NULL)
int fc_thread_pool_init_ex(FCThreadPool *pool, const char *name,
const int limit, const int stack_size, const int max_idle_time,
const int min_idle_count, bool * volatile pcontinue_flag,
FCThreadExtraDataCallbacks *extra_data_callbacks);
void fc_thread_pool_destroy(FCThreadPool *pool); void fc_thread_pool_destroy(FCThreadPool *pool);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册