提交 27949c35 编写于 作者: M Matt Caswell

Simplify async pool handling

A lot of the pool handling code was in the arch specific files, but was
actually boiler plate and the same across the implementations. This commit
moves as much code as possible out of the arch specific files.
Reviewed-by: NRich Salz <rsalz@openssl.org>
上级 2b2c78d4
...@@ -56,41 +56,6 @@ ...@@ -56,41 +56,6 @@
#ifdef ASYNC_NULL #ifdef ASYNC_NULL
STACK_OF(ASYNC_JOB) *async_get_pool(void)
{
return NULL;
}
int async_set_pool(STACK_OF(ASYNC_JOB) *poolin, size_t curr_size,
size_t max_size)
{
return 0;
}
void async_increment_pool_size(void)
{
return;
}
void async_release_job_to_pool(ASYNC_JOB *job)
{
return;
}
size_t async_pool_max_size(void)
{
return 0;
}
void async_release_pool(void)
{
return;
}
int async_pool_can_grow(void) {
return 0;
}
int async_pipe(OSSL_ASYNC_FD *pipefds) int async_pipe(OSSL_ASYNC_FD *pipefds)
{ {
return -1; return -1;
......
...@@ -72,5 +72,7 @@ typedef struct async_fibre_st { ...@@ -72,5 +72,7 @@ typedef struct async_fibre_st {
# define async_fibre_makecontext(c) # define async_fibre_makecontext(c)
# define async_fibre_free(f) # define async_fibre_free(f)
# define async_fibre_init_dispatcher(f) # define async_fibre_init_dispatcher(f)
# define async_get_pool() NULL
# define async_set_pool(p) 0
#endif #endif
...@@ -61,17 +61,11 @@ ...@@ -61,17 +61,11 @@
# include <openssl/crypto.h> # include <openssl/crypto.h>
# include <openssl/async.h> # include <openssl/async.h>
__thread async_ctx *sysvctx; __thread async_ctx *posixctx;
__thread async_pool *posixpool;
#define STACKSIZE 32768 #define STACKSIZE 32768
extern __thread size_t posixpool_max_size;
extern __thread size_t posixpool_curr_size;
extern __thread STACK_OF(ASYNC_JOB) *posixpool;
__thread size_t posixpool_max_size = 0;
__thread size_t posixpool_curr_size = 0;
__thread STACK_OF(ASYNC_JOB) *posixpool = NULL;
int async_fibre_init(async_fibre *fibre) int async_fibre_init(async_fibre *fibre)
{ {
void *stack = NULL; void *stack = NULL;
...@@ -103,6 +97,14 @@ int async_pipe(OSSL_ASYNC_FD *pipefds) ...@@ -103,6 +97,14 @@ int async_pipe(OSSL_ASYNC_FD *pipefds)
return 0; return 0;
} }
int async_close_fd(OSSL_ASYNC_FD fd)
{
if (close(fd) != 0)
return 0;
return 1;
}
int async_write1(OSSL_ASYNC_FD fd, const void *buf) int async_write1(OSSL_ASYNC_FD fd, const void *buf)
{ {
if (write(fd, buf, 1) > 0) if (write(fd, buf, 1) > 0)
...@@ -119,45 +121,4 @@ int async_read1(OSSL_ASYNC_FD fd, void *buf) ...@@ -119,45 +121,4 @@ int async_read1(OSSL_ASYNC_FD fd, void *buf)
return 0; return 0;
} }
STACK_OF(ASYNC_JOB) *async_get_pool(void)
{
return posixpool;
}
int async_set_pool(STACK_OF(ASYNC_JOB) *poolin, size_t curr_size,
size_t max_size)
{
posixpool = poolin;
posixpool_curr_size = curr_size;
posixpool_max_size = max_size;
return 1;
}
void async_increment_pool_size(void)
{
posixpool_curr_size++;
}
void async_release_job_to_pool(ASYNC_JOB *job)
{
sk_ASYNC_JOB_push(posixpool, job);
}
size_t async_pool_max_size(void)
{
return posixpool_max_size;
}
void async_release_pool(void)
{
sk_ASYNC_JOB_free(posixpool);
posixpool = NULL;
}
int async_pool_can_grow(void)
{
return (posixpool_max_size == 0)
|| (posixpool_curr_size < posixpool_max_size);
}
#endif #endif
...@@ -73,7 +73,8 @@ ...@@ -73,7 +73,8 @@
# include <setjmp.h> # include <setjmp.h>
# include "e_os.h" # include "e_os.h"
extern __thread async_ctx *sysvctx; extern __thread async_ctx *posixctx;
extern __thread async_pool *posixpool;
typedef struct async_fibre_st { typedef struct async_fibre_st {
ucontext_t fibre; ucontext_t fibre;
...@@ -81,8 +82,10 @@ typedef struct async_fibre_st { ...@@ -81,8 +82,10 @@ typedef struct async_fibre_st {
int env_init; int env_init;
} async_fibre; } async_fibre;
# define async_set_ctx(nctx) (sysvctx = (nctx)) # define async_set_ctx(nctx) (posixctx = (nctx))
# define async_get_ctx() (sysvctx) # define async_get_ctx() (posixctx)
# define async_set_pool(p) (posixpool = (p))
# define async_get_pool() (posixpool)
static inline int async_fibre_swapcontext(async_fibre *o, async_fibre *n, int r) static inline int async_fibre_swapcontext(async_fibre *o, async_fibre *n, int r)
{ {
......
...@@ -51,7 +51,7 @@ ...@@ -51,7 +51,7 @@
* ==================================================================== * ====================================================================
*/ */
#include "async_win.h" #include "../async_locl.h"
#ifdef ASYNC_WIN #ifdef ASYNC_WIN
...@@ -95,6 +95,14 @@ int async_pipe(OSSL_ASYNC_FD *pipefds) ...@@ -95,6 +95,14 @@ int async_pipe(OSSL_ASYNC_FD *pipefds)
return 1; return 1;
} }
int async_close_fd(OSSL_ASYNC_FD fd)
{
if (CloseHandle(fd) == 0)
return 0;
return 1;
}
int async_write1(OSSL_ASYNC_FD fd, const void *buf) int async_write1(OSSL_ASYNC_FD fd, const void *buf)
{ {
DWORD numwritten = 0; DWORD numwritten = 0;
...@@ -115,70 +123,17 @@ int async_read1(OSSL_ASYNC_FD fd, void *buf) ...@@ -115,70 +123,17 @@ int async_read1(OSSL_ASYNC_FD fd, void *buf)
return 0; return 0;
} }
STACK_OF(ASYNC_JOB) *async_get_pool(void) async_pool *async_get_pool(void)
{ {
struct winpool *pool; return (async_pool *)
pool = (struct winpool *)
CRYPTO_get_thread_local(CRYPTO_THREAD_LOCAL_ASYNC_POOL); CRYPTO_get_thread_local(CRYPTO_THREAD_LOCAL_ASYNC_POOL);
return pool->pool;
} }
int async_set_pool(STACK_OF(ASYNC_JOB) *poolin, size_t curr_size, int async_set_pool(async_pool *pool)
size_t max_size)
{ {
struct winpool *pool;
pool = OPENSSL_malloc(sizeof *pool);
if (pool == NULL)
return 0;
pool->pool = poolin;
pool->curr_size = curr_size;
pool->max_size = max_size;
CRYPTO_set_thread_local(CRYPTO_THREAD_LOCAL_ASYNC_POOL, (void *)pool); CRYPTO_set_thread_local(CRYPTO_THREAD_LOCAL_ASYNC_POOL, (void *)pool);
return 1; return 1;
} }
void async_increment_pool_size(void)
{
struct winpool *pool;
pool = (struct winpool *)
CRYPTO_get_thread_local(CRYPTO_THREAD_LOCAL_ASYNC_POOL);
pool->curr_size++;
}
void async_release_job_to_pool(ASYNC_JOB *job)
{
struct winpool *pool;
pool = (struct winpool *)
CRYPTO_get_thread_local(CRYPTO_THREAD_LOCAL_ASYNC_POOL);
sk_ASYNC_JOB_push(pool->pool, job);
}
size_t async_pool_max_size(void)
{
struct winpool *pool;
pool = (struct winpool *)
CRYPTO_get_thread_local(CRYPTO_THREAD_LOCAL_ASYNC_POOL);
return pool->max_size;
}
void async_release_pool(void)
{
struct winpool *pool;
pool = (struct winpool *)
CRYPTO_get_thread_local(CRYPTO_THREAD_LOCAL_ASYNC_POOL);
sk_ASYNC_JOB_free(pool->pool);
OPENSSL_free(pool);
CRYPTO_set_thread_local(CRYPTO_THREAD_LOCAL_ASYNC_POOL, NULL);
}
int async_pool_can_grow(void)
{
struct winpool *pool;
pool = (struct winpool *)
CRYPTO_get_thread_local(CRYPTO_THREAD_LOCAL_ASYNC_POOL);
return (pool->max_size == 0) || (pool->curr_size < pool->max_size);
}
#endif #endif
...@@ -81,4 +81,7 @@ typedef struct async_fibre_st { ...@@ -81,4 +81,7 @@ typedef struct async_fibre_st {
int async_fibre_init_dispatcher(async_fibre *fibre); int async_fibre_init_dispatcher(async_fibre *fibre);
VOID CALLBACK async_start_func_win(PVOID unused); VOID CALLBACK async_start_func_win(PVOID unused);
async_pool *async_get_pool(void);
int async_set_pool(async_pool *pool);
#endif #endif
...@@ -69,6 +69,8 @@ ...@@ -69,6 +69,8 @@
#define ASYNC_JOB_PAUSED 2 #define ASYNC_JOB_PAUSED 2
#define ASYNC_JOB_STOPPING 3 #define ASYNC_JOB_STOPPING 3
static void async_free_pool_internal(async_pool *pool);
static async_ctx *async_ctx_new(void) static async_ctx *async_ctx_new(void)
{ {
async_ctx *nctx = NULL; async_ctx *nctx = NULL;
...@@ -138,13 +140,15 @@ static void async_job_free(ASYNC_JOB *job) ...@@ -138,13 +140,15 @@ static void async_job_free(ASYNC_JOB *job)
if (job != NULL) { if (job != NULL) {
OPENSSL_free(job->funcargs); OPENSSL_free(job->funcargs);
async_fibre_free(&job->fibrectx); async_fibre_free(&job->fibrectx);
async_close_fd(job->wait_fd);
async_close_fd(job->wake_fd);
OPENSSL_free(job); OPENSSL_free(job);
} }
} }
static ASYNC_JOB *async_get_pool_job(void) { static ASYNC_JOB *async_get_pool_job(void) {
ASYNC_JOB *job; ASYNC_JOB *job;
STACK_OF(ASYNC_JOB) *pool; async_pool *pool;
pool = async_get_pool(); pool = async_get_pool();
if (pool == NULL) { if (pool == NULL) {
...@@ -157,26 +161,28 @@ static ASYNC_JOB *async_get_pool_job(void) { ...@@ -157,26 +161,28 @@ static ASYNC_JOB *async_get_pool_job(void) {
pool = async_get_pool(); pool = async_get_pool();
} }
job = sk_ASYNC_JOB_pop(pool); job = sk_ASYNC_JOB_pop(pool->jobs);
if (job == NULL) { if (job == NULL) {
/* Pool is empty */ /* Pool is empty */
if (!async_pool_can_grow()) if ((pool->max_size != 0) && (pool->curr_size >= pool->max_size))
return NULL; return NULL;
job = async_job_new(); job = async_job_new();
if (job) { if (job) {
async_fibre_makecontext(&job->fibrectx); async_fibre_makecontext(&job->fibrectx);
async_increment_pool_size(); pool->curr_size++;
} }
} }
return job; return job;
} }
static void async_release_job(ASYNC_JOB *job) { static void async_release_job(ASYNC_JOB *job) {
async_pool *pool;
pool = async_get_pool();
OPENSSL_free(job->funcargs); OPENSSL_free(job->funcargs);
job->funcargs = NULL; job->funcargs = NULL;
/* Ignore error return */ sk_ASYNC_JOB_push(pool->jobs, job);
async_release_job_to_pool(job);
} }
void async_start_func(void) void async_start_func(void)
...@@ -309,31 +315,49 @@ int ASYNC_pause_job(void) ...@@ -309,31 +315,49 @@ int ASYNC_pause_job(void)
return 1; return 1;
} }
static void async_empty_pool(STACK_OF(ASYNC_JOB) *pool) static void async_empty_pool(async_pool *pool)
{ {
ASYNC_JOB *job; ASYNC_JOB *job;
if (!pool || !pool->jobs)
return;
do { do {
job = sk_ASYNC_JOB_pop(pool); job = sk_ASYNC_JOB_pop(pool->jobs);
async_job_free(job); async_job_free(job);
} while (job); } while (job);
} }
int ASYNC_init_pool(size_t max_size, size_t init_size) int ASYNC_init_pool(size_t max_size, size_t init_size)
{ {
STACK_OF(ASYNC_JOB) *pool; async_pool *pool;
size_t curr_size = 0; size_t curr_size = 0;
if (init_size > max_size) { if (init_size > max_size || max_size == 0) {
ASYNCerr(ASYNC_F_ASYNC_INIT_POOL, ASYNC_R_INVALID_POOL_SIZE); ASYNCerr(ASYNC_F_ASYNC_INIT_POOL, ASYNC_R_INVALID_POOL_SIZE);
return 0; return 0;
} }
pool = sk_ASYNC_JOB_new_null(); if(async_get_pool() != NULL) {
ASYNCerr(ASYNC_F_ASYNC_INIT_POOL, ASYNC_R_POOL_ALREADY_INITED);
return 0;
}
pool = OPENSSL_zalloc(sizeof *pool);
if (pool == NULL) { if (pool == NULL) {
ASYNCerr(ASYNC_F_ASYNC_INIT_POOL, ERR_R_MALLOC_FAILURE); ASYNCerr(ASYNC_F_ASYNC_INIT_POOL, ERR_R_MALLOC_FAILURE);
return 0; return 0;
} }
pool->jobs = sk_ASYNC_JOB_new_null();
if (pool->jobs == NULL) {
ASYNCerr(ASYNC_F_ASYNC_INIT_POOL, ERR_R_MALLOC_FAILURE);
OPENSSL_free(pool);
return 0;
}
pool->max_size = max_size;
/* Pre-create jobs as required */ /* Pre-create jobs as required */
while (init_size) { while (init_size) {
ASYNC_JOB *job; ASYNC_JOB *job;
...@@ -341,7 +365,7 @@ int ASYNC_init_pool(size_t max_size, size_t init_size) ...@@ -341,7 +365,7 @@ int ASYNC_init_pool(size_t max_size, size_t init_size)
if (job) { if (job) {
async_fibre_makecontext(&job->fibrectx); async_fibre_makecontext(&job->fibrectx);
job->funcargs = NULL; job->funcargs = NULL;
sk_ASYNC_JOB_push(pool, job); sk_ASYNC_JOB_push(pool->jobs, job);
curr_size++; curr_size++;
init_size--; init_size--;
} else { } else {
...@@ -352,30 +376,36 @@ int ASYNC_init_pool(size_t max_size, size_t init_size) ...@@ -352,30 +376,36 @@ int ASYNC_init_pool(size_t max_size, size_t init_size)
init_size = 0; init_size = 0;
} }
} }
pool->curr_size = curr_size;
if (!async_set_pool(pool, curr_size, max_size)) { if (!async_set_pool(pool)) {
ASYNCerr(ASYNC_F_ASYNC_INIT_POOL, ASYNC_R_FAILED_TO_SET_POOL); ASYNCerr(ASYNC_F_ASYNC_INIT_POOL, ASYNC_R_FAILED_TO_SET_POOL);
async_empty_pool(pool); goto err;
sk_ASYNC_JOB_free(pool);
return 0;
} }
return 1; return 1;
err:
async_free_pool_internal(pool);
return 0;
} }
void ASYNC_free_pool(void) static void async_free_pool_internal(async_pool *pool)
{ {
STACK_OF(ASYNC_JOB) *pool;
pool = async_get_pool();
if (pool == NULL) if (pool == NULL)
return; return;
async_empty_pool(pool); async_empty_pool(pool);
async_release_pool(); sk_ASYNC_JOB_free(pool->jobs);
OPENSSL_free(pool);
async_set_pool(NULL);
async_ctx_free(); async_ctx_free();
} }
void ASYNC_free_pool(void)
{
async_free_pool_internal(async_get_pool());
}
ASYNC_JOB *ASYNC_get_current_job(void) ASYNC_JOB *ASYNC_get_current_job(void)
{ {
async_ctx *ctx; async_ctx *ctx;
......
...@@ -70,11 +70,11 @@ ...@@ -70,11 +70,11 @@
# define ERR_REASON(reason) ERR_PACK(ERR_LIB_ASYNC,0,reason) # define ERR_REASON(reason) ERR_PACK(ERR_LIB_ASYNC,0,reason)
static ERR_STRING_DATA ASYNC_str_functs[] = { static ERR_STRING_DATA ASYNC_str_functs[] = {
{ERR_FUNC(ASYNC_F_ASYNC_CTX_NEW), "ASYNC_CTX_NEW"}, {ERR_FUNC(ASYNC_F_ASYNC_CTX_NEW), "async_ctx_new"},
{ERR_FUNC(ASYNC_F_ASYNC_INIT_POOL), "ASYNC_init_pool"}, {ERR_FUNC(ASYNC_F_ASYNC_INIT_POOL), "ASYNC_init_pool"},
{ERR_FUNC(ASYNC_F_ASYNC_JOB_NEW), "ASYNC_JOB_NEW"}, {ERR_FUNC(ASYNC_F_ASYNC_JOB_NEW), "async_job_new"},
{ERR_FUNC(ASYNC_F_ASYNC_PAUSE_JOB), "ASYNC_pause_job"}, {ERR_FUNC(ASYNC_F_ASYNC_PAUSE_JOB), "ASYNC_pause_job"},
{ERR_FUNC(ASYNC_F_ASYNC_START_FUNC), "ASYNC_START_FUNC"}, {ERR_FUNC(ASYNC_F_ASYNC_START_FUNC), "async_start_func"},
{ERR_FUNC(ASYNC_F_ASYNC_START_JOB), "ASYNC_start_job"}, {ERR_FUNC(ASYNC_F_ASYNC_START_JOB), "ASYNC_start_job"},
{0, NULL} {0, NULL}
}; };
...@@ -84,6 +84,7 @@ static ERR_STRING_DATA ASYNC_str_reasons[] = { ...@@ -84,6 +84,7 @@ static ERR_STRING_DATA ASYNC_str_reasons[] = {
{ERR_REASON(ASYNC_R_FAILED_TO_SET_POOL), "failed to set pool"}, {ERR_REASON(ASYNC_R_FAILED_TO_SET_POOL), "failed to set pool"},
{ERR_REASON(ASYNC_R_FAILED_TO_SWAP_CONTEXT), "failed to swap context"}, {ERR_REASON(ASYNC_R_FAILED_TO_SWAP_CONTEXT), "failed to swap context"},
{ERR_REASON(ASYNC_R_INVALID_POOL_SIZE), "invalid pool size"}, {ERR_REASON(ASYNC_R_INVALID_POOL_SIZE), "invalid pool size"},
{ERR_REASON(ASYNC_R_POOL_ALREADY_INITED), "pool already inited"},
{0, NULL} {0, NULL}
}; };
......
...@@ -55,6 +55,7 @@ ...@@ -55,6 +55,7 @@
#include <openssl/crypto.h> #include <openssl/crypto.h>
typedef struct async_ctx_st async_ctx; typedef struct async_ctx_st async_ctx;
typedef struct async_pool_st async_pool;
#include "arch/async_win.h" #include "arch/async_win.h"
#include "arch/async_posix.h" #include "arch/async_posix.h"
...@@ -79,15 +80,14 @@ struct async_job_st { ...@@ -79,15 +80,14 @@ struct async_job_st {
DECLARE_STACK_OF(ASYNC_JOB) DECLARE_STACK_OF(ASYNC_JOB)
struct async_pool_st {
STACK_OF(ASYNC_JOB) *jobs;
size_t curr_size;
size_t max_size;
};
void async_start_func(void); void async_start_func(void);
STACK_OF(ASYNC_JOB) *async_get_pool(void);
int async_set_pool(STACK_OF(ASYNC_JOB) *poolin, size_t curr_size,
size_t max_size);
void async_increment_pool_size(void);
void async_release_job_to_pool(ASYNC_JOB *job);
size_t async_pool_max_size(void);
void async_release_pool(void);
int async_pool_can_grow(void);
int async_pipe(OSSL_ASYNC_FD *pipefds); int async_pipe(OSSL_ASYNC_FD *pipefds);
int async_close_fd(OSSL_ASYNC_FD fd);
int async_write1(OSSL_ASYNC_FD fd, const void *buf); int async_write1(OSSL_ASYNC_FD fd, const void *buf);
int async_read1(OSSL_ASYNC_FD fd, void *buf); int async_read1(OSSL_ASYNC_FD fd, void *buf);
...@@ -111,6 +111,7 @@ void ERR_load_ASYNC_strings(void); ...@@ -111,6 +111,7 @@ void ERR_load_ASYNC_strings(void);
# define ASYNC_R_FAILED_TO_SET_POOL 101 # define ASYNC_R_FAILED_TO_SET_POOL 101
# define ASYNC_R_FAILED_TO_SWAP_CONTEXT 102 # define ASYNC_R_FAILED_TO_SWAP_CONTEXT 102
# define ASYNC_R_INVALID_POOL_SIZE 103 # define ASYNC_R_INVALID_POOL_SIZE 103
# define ASYNC_R_POOL_ALREADY_INITED 104
#ifdef __cplusplus #ifdef __cplusplus
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册