提交 c8bf9f24 编写于 作者: Y YuQing

connection_pool support validate connection on error

上级 4011fcb3
Version 1.44 2020-04-24
Version 1.44 2020-04-26
* add test file src/tests/test_pthread_lock.c
* add uniq_skiplist.[hc]
* add function split_string_ex
......@@ -26,6 +26,7 @@ Version 1.44 2020-04-24
* bugfixed: call fast_mblock_destroy in common_blocked_queue_destroy
* add function fc_get_file_line_count_ex
* uniq_skiplist add function find_ge and support bidirection
* connection_pool support validate connection on error
Version 1.43 2019-12-25
* replace function call system to getExecResult,
......
......@@ -4,7 +4,7 @@
%define CommitVersion %(echo $COMMIT_VERSION)
Name: libfastcommon
Version: 1.0.43
Version: 1.0.44
Release: 1%{?dist}
Summary: c common functions library extracted from my open source projects FastDFS
License: LGPL
......
......@@ -124,6 +124,8 @@ extern int pthread_mutexattr_settype(pthread_mutexattr_t *attr, int kind);
#define FC_IS_LETTER(ch) ((ch >= 'A' && ch <= 'Z') || (ch >= 'a' && ch <= 'z'))
#define FC_IS_UPPER_LETTER(ch) (ch >= 'A' && ch <= 'Z')
#define FC_IS_LOWER_LETTER(ch) (ch >= 'a' && ch <= 'z')
#define FC_MIN(v1, v2) (v1 < v2 ? v1 : v2)
#define FC_MAX(v1, v2) (v1 > v2 ? v1 : v2)
#define STRERROR(no) (strerror(no) != NULL ? strerror(no) : "Unkown error")
......
......@@ -15,11 +15,13 @@
#include "sched_thread.h"
#include "connection_pool.h"
int conn_pool_init_ex(ConnectionPool *cp, int connect_timeout, \
int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout,
const int max_count_per_entry, const int max_idle_time,
const int socket_domain)
const int socket_domain, const int htable_init_capacity,
fc_validate_connection_func validate_func, void *validate_args)
{
int result;
int init_capacity;
if ((result=init_pthread_lock(&cp->lock)) != 0)
{
......@@ -29,22 +31,34 @@ int conn_pool_init_ex(ConnectionPool *cp, int connect_timeout, \
cp->max_count_per_entry = max_count_per_entry;
cp->max_idle_time = max_idle_time;
cp->socket_domain = socket_domain;
cp->validate_callback.func = validate_func;
cp->validate_callback.args = validate_args;
return hash_init(&(cp->hash_array), simple_hash, 1024, 0.75);
}
init_capacity = htable_init_capacity > 0 ? htable_init_capacity : 256;
if ((result=fast_mblock_init_ex1(&cp->manager_allocator, "cpool_manager",
sizeof(ConnectionManager), init_capacity, NULL, NULL,
false)) != 0)
{
return result;
}
int conn_pool_init(ConnectionPool *cp, int connect_timeout,
const int max_count_per_entry, const int max_idle_time)
{
const int socket_domain = AF_INET;
return conn_pool_init_ex(cp, connect_timeout, max_count_per_entry,
max_idle_time, socket_domain);
if ((result=fast_mblock_init_ex1(&cp->node_allocator, "cpool_node",
sizeof(ConnectionNode) + sizeof(ConnectionInfo),
4 * init_capacity, NULL, NULL, false)) != 0)
{
return result;
}
return hash_init(&(cp->hash_array), simple_hash, init_capacity, 0.75);
}
int coon_pool_close_connections(const int index, const HashData *data, void *args)
static int coon_pool_close_connections(const int index,
const HashData *data, void *args)
{
ConnectionPool *cp;
ConnectionManager *cm;
cp = (ConnectionPool *)args;
cm = (ConnectionManager *)data->value;
if (cm != NULL)
{
......@@ -58,9 +72,10 @@ int coon_pool_close_connections(const int index, const HashData *data, void *arg
node = node->next;
conn_pool_disconnect_server(deleted->conn);
free(deleted);
fast_mblock_free_object(&cp->node_allocator, deleted);
}
free(cm);
fast_mblock_free_object(&cp->manager_allocator, cm);
}
return 0;
......@@ -69,7 +84,7 @@ int coon_pool_close_connections(const int index, const HashData *data, void *arg
void conn_pool_destroy(ConnectionPool *cp)
{
pthread_mutex_lock(&cp->lock);
hash_walk(&(cp->hash_array), coon_pool_close_connections, NULL);
hash_walk(&(cp->hash_array), coon_pool_close_connections, cp);
hash_destroy(&(cp->hash_array));
pthread_mutex_unlock(&cp->lock);
......@@ -163,7 +178,6 @@ ConnectionInfo *conn_pool_get_connection(ConnectionPool *cp,
{
char key[INET6_ADDRSTRLEN + 8];
int key_len;
int bytes;
char *p;
ConnectionManager *cm;
ConnectionNode *node;
......@@ -176,15 +190,14 @@ ConnectionInfo *conn_pool_get_connection(ConnectionPool *cp,
cm = (ConnectionManager *)hash_find(&cp->hash_array, key, key_len);
if (cm == NULL)
{
cm = (ConnectionManager *)malloc(sizeof(ConnectionManager));
cm = (ConnectionManager *)fast_mblock_alloc_object(
&cp->manager_allocator);
if (cm == NULL)
{
*err_no = errno != 0 ? errno : ENOMEM;
logError("file: "__FILE__", line: %d, " \
"malloc %d bytes fail, errno: %d, " \
"error info: %s", __LINE__, \
(int)sizeof(ConnectionManager), \
*err_no, STRERROR(*err_no));
*err_no = ENOMEM;
logError("file: "__FILE__", line: %d, "
"malloc %d bytes fail", __LINE__,
(int)sizeof(ConnectionManager));
pthread_mutex_unlock(&cp->lock);
return NULL;
}
......@@ -220,18 +233,16 @@ ConnectionInfo *conn_pool_get_connection(ConnectionPool *cp,
return NULL;
}
bytes = sizeof(ConnectionNode) + sizeof(ConnectionInfo);
p = (char *)malloc(bytes);
p = (char *)fast_mblock_alloc_object(&cp->node_allocator);
if (p == NULL)
{
*err_no = errno != 0 ? errno : ENOMEM;
logError("file: "__FILE__", line: %d, " \
"malloc %d bytes fail, errno: %d, " \
"error info: %s", __LINE__, \
bytes, *err_no, STRERROR(*err_no));
pthread_mutex_unlock(&cm->lock);
return NULL;
}
{
*err_no = ENOMEM;
logError("file: "__FILE__", line: %d, "
"malloc %d bytes fail", __LINE__, (int)
(sizeof(ConnectionNode) + sizeof(ConnectionInfo)));
pthread_mutex_unlock(&cm->lock);
return NULL;
}
node = (ConnectionNode *)p;
node->conn = (ConnectionInfo *)(p + sizeof(ConnectionNode));
......@@ -245,15 +256,16 @@ ConnectionInfo *conn_pool_get_connection(ConnectionPool *cp,
memcpy(node->conn, conn, sizeof(ConnectionInfo));
node->conn->socket_domain = cp->socket_domain;
node->conn->sock = -1;
*err_no = conn_pool_connect_server(node->conn, \
node->conn->validate_flag = false;
*err_no = conn_pool_connect_server(node->conn,
cp->connect_timeout);
if (*err_no != 0)
{
pthread_mutex_lock(&cm->lock);
cm->total_count--; //rollback
fast_mblock_free_object(&cp->node_allocator, p);
pthread_mutex_unlock(&cm->lock);
free(p);
return NULL;
}
......@@ -267,6 +279,8 @@ ConnectionInfo *conn_pool_get_connection(ConnectionPool *cp,
}
else
{
bool invalid;
node = cm->head;
ci = node->conn;
cm->head = node->next;
......@@ -274,6 +288,28 @@ ConnectionInfo *conn_pool_get_connection(ConnectionPool *cp,
if (current_time - node->atime > cp->max_idle_time)
{
invalid = true;
}
else if (ci->validate_flag)
{
ci->validate_flag = false;
if (cp->validate_callback.func != NULL)
{
invalid = cp->validate_callback.func(ci,
cp->validate_callback.args) != 0;
}
else
{
invalid = false;
}
}
else
{
invalid = false;
}
if (invalid)
{
cm->total_count--;
logDebug("file: "__FILE__", line: %d, " \
......@@ -287,7 +323,7 @@ ConnectionInfo *conn_pool_get_connection(ConnectionPool *cp,
cm->free_count);
conn_pool_disconnect_server(ci);
free(node);
fast_mblock_free_object(&cp->node_allocator, node);
continue;
}
......@@ -335,18 +371,25 @@ int conn_pool_close_connection_ex(ConnectionPool *cp, ConnectionInfo *conn,
pthread_mutex_lock(&cm->lock);
if (bForce)
{
cm->total_count--;
{
cm->total_count--;
logDebug("file: "__FILE__", line: %d, " \
"server %s:%d, release connection: %d, " \
"total_count: %d, free_count: %d",
__LINE__, conn->ip_addr, conn->port,
conn->sock, cm->total_count, cm->free_count);
logDebug("file: "__FILE__", line: %d, "
"server %s:%d, release connection: %d, "
"total_count: %d, free_count: %d",
__LINE__, conn->ip_addr, conn->port,
conn->sock, cm->total_count, cm->free_count);
conn_pool_disconnect_server(conn);
free(node);
}
conn_pool_disconnect_server(conn);
fast_mblock_free_object(&cp->node_allocator, node);
node = cm->head;
while (node != NULL)
{
node->conn->validate_flag = true;
node = node->next;
}
}
else
{
node->atime = get_current_time();
......
......@@ -16,9 +16,10 @@
#include <string.h>
#include <time.h>
#include "common_define.h"
#include "fast_mblock.h"
#include "ini_file_reader.h"
#include "pthread_func.h"
#include "hash.h"
#include "ini_file_reader.h"
#ifdef __cplusplus
extern "C" {
......@@ -35,11 +36,14 @@ extern "C" {
typedef struct
{
int sock;
int port;
short port;
short socket_domain; //socket domain, AF_INET, AF_INET6 or AF_UNSPEC for auto dedect
bool validate_flag; //for connection pool
char ip_addr[INET6_ADDRSTRLEN];
int socket_domain; //socket domain, AF_INET, AF_INET6 or AF_UNSPEC for auto dedect
} ConnectionInfo;
typedef int (*fc_validate_connection_func)(ConnectionInfo *conn, void *args);
struct tagConnectionManager;
typedef struct tagConnectionNode {
......@@ -68,6 +72,14 @@ typedef struct tagConnectionPool {
*/
int max_idle_time;
int socket_domain; //socket domain
struct fast_mblock_man manager_allocator;
struct fast_mblock_man node_allocator;
struct {
fc_validate_connection_func func;
void *args;
} validate_callback;
} ConnectionPool;
/**
......@@ -78,11 +90,33 @@ typedef struct tagConnectionPool {
* max_count_per_entry: max connection count per host:port
* max_idle_time: reconnect the server after max idle time in seconds
* socket_domain: the socket domain
* validate_func: the validate connection callback
* validate_args: the args for validate connection callback
* return 0 for success, != 0 for error
*/
int conn_pool_init_ex(ConnectionPool *cp, int connect_timeout,
int conn_pool_init_ex1(ConnectionPool *cp, int connect_timeout,
const int max_count_per_entry, const int max_idle_time,
const int socket_domain);
const int socket_domain, const int htable_init_capacity,
fc_validate_connection_func validate_func, void *validate_args);
/**
* init ex function
* parameters:
* cp: the ConnectionPool
* connect_timeout: the connect timeout in seconds
* max_count_per_entry: max connection count per host:port
* max_idle_time: reconnect the server after max idle time in seconds
* socket_domain: the socket domain
* return 0 for success, != 0 for error
*/
static inline int conn_pool_init_ex(ConnectionPool *cp, int connect_timeout,
const int max_count_per_entry, const int max_idle_time,
const int socket_domain)
{
const int htable_init_capacity = 0;
return conn_pool_init_ex1(cp, connect_timeout, max_count_per_entry,
max_idle_time, socket_domain, htable_init_capacity, NULL, NULL);
}
/**
* init function
......@@ -93,8 +127,14 @@ int conn_pool_init_ex(ConnectionPool *cp, int connect_timeout,
* max_idle_time: reconnect the server after max idle time in seconds
* return 0 for success, != 0 for error
*/
int conn_pool_init(ConnectionPool *cp, int connect_timeout,
const int max_count_per_entry, const int max_idle_time);
static inline int conn_pool_init(ConnectionPool *cp, int connect_timeout,
const int max_count_per_entry, const int max_idle_time)
{
const int socket_domain = AF_INET;
const int htable_init_capacity = 0;
return conn_pool_init_ex1(cp, connect_timeout, max_count_per_entry,
max_idle_time, socket_domain, htable_init_capacity, NULL, NULL);
}
/**
* destroy function
......
......@@ -165,7 +165,8 @@ parameters:
return error no, 0 for success, != 0 fail
*/
static inline int fast_mblock_init_ex1(struct fast_mblock_man *mblock,
const char *name, const int element_size, const int alloc_elements_once,
const char *name, const int element_size,
const int alloc_elements_once,
fast_mblock_alloc_init_func init_func,
void *init_args, const bool need_lock)
{
......
......@@ -254,7 +254,7 @@ int main(int argc, char *argv[])
srand(time(NULL));
fast_mblock_manager_init();
result = uniq_skiplist_init_ex(&factory, LEVEL_COUNT, compare_func,
result = uniq_skiplist_init_ex2(&factory, LEVEL_COUNT, compare_func,
free_test_func, 0, MIN_ALLOC_ONCE, 0, true);
if (result != 0) {
return result;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册