/* * Copyright (c) 2020 YuQing <384681@qq.com> * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the Lesser GNU General Public License, version 3 * or later ("LGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the Lesser GNU General Public License * along with this program. If not, see . */ //fast_task_queue.h #ifndef _FAST_TASK_QUEUE_H #define _FAST_TASK_QUEUE_H #include #include #include #include #include "common_define.h" #include "ioevent.h" #include "fast_timer.h" #define FC_NOTIFY_READ_FD(tdata) (tdata)->pipe_fds[0] #define FC_NOTIFY_WRITE_FD(tdata) (tdata)->pipe_fds[1] #define ALIGNED_TASK_INFO_SIZE MEM_ALIGN(sizeof(struct fast_task_info)) struct nio_thread_data; struct fast_task_info; typedef int (*ThreadLoopCallback) (struct nio_thread_data *pThreadData); typedef int (*TaskFinishCallback) (struct fast_task_info *pTask); typedef void (*TaskCleanUpCallback) (struct fast_task_info *pTask); typedef int (*TaskInitCallback)(struct fast_task_info *pTask); typedef void (*IOEventCallback) (int sock, short event, void *arg); struct fast_task_info; typedef struct ioevent_entry { int fd; FastTimerEntry timer; IOEventCallback callback; } IOEventEntry; struct nio_thread_data { struct ioevent_puller ev_puller; struct fast_timer timer; int pipe_fds[2]; //for notify struct fast_task_info *deleted_list; //tasks for cleanup ThreadLoopCallback thread_loop_callback; void *arg; //extra argument pointer struct { struct fast_task_info *head; struct fast_task_info *tail; pthread_mutex_t lock; } waiting_queue; //task queue struct { bool enabled; volatile int64_t counter; } notify; //for thread notify }; struct fast_task_info { IOEventEntry event; //must first union { char server_ip[IP_ADDRESS_SIZE]; char client_ip[IP_ADDRESS_SIZE]; }; void *arg; //extra argument pointer char *data; //buffer for write or recv int size; //alloc size int length; //data length int offset; //current offset uint16_t port; //peer port struct { uint8_t current; uint8_t notify; } nio_stages; //stages for network IO bool canceled; //if task canceled int connect_timeout; //for client side int network_timeout; int64_t req_count; //request count TaskFinishCallback finish_callback; struct nio_thread_data *thread_data; void *ctx; //context pointer for libserverframe nio struct fast_task_info *next; }; struct fast_task_queue { struct fast_task_info *head; struct fast_task_info *tail; pthread_mutex_t lock; int max_connections; int alloc_connections; int alloc_task_once; int min_buff_size; int max_buff_size; int arg_size; int block_size; bool malloc_whole_block; TaskInitCallback init_callback; }; #ifdef __cplusplus extern "C" { #endif int free_queue_init_ex2(const int max_connections, const int init_connections, const int alloc_task_once, const int min_buff_size, const int max_buff_size, const int arg_size, TaskInitCallback init_callback); static inline int free_queue_init_ex(const int max_connections, const int init_connections, const int alloc_task_once, const int min_buff_size, const int max_buff_size, const int arg_size) { return free_queue_init_ex2(max_connections, init_connections, alloc_task_once, min_buff_size, max_buff_size, arg_size, NULL); } static inline int free_queue_init(const int max_connections, const int min_buff_size, const int max_buff_size, const int arg_size) { return free_queue_init_ex2(max_connections, max_connections, 0, min_buff_size, max_buff_size, arg_size, NULL); } void free_queue_destroy(); int free_queue_push(struct fast_task_info *pTask); struct fast_task_info *free_queue_pop(); int free_queue_count(); int free_queue_alloc_connections(); int free_queue_set_buffer_size(struct fast_task_info *pTask, const int expect_size); int free_queue_realloc_buffer(struct fast_task_info *pTask, const int expect_size); int free_queue_set_max_buffer_size(struct fast_task_info *pTask); int free_queue_realloc_max_buffer(struct fast_task_info *pTask); int task_queue_init(struct fast_task_queue *pQueue); int task_queue_push(struct fast_task_queue *pQueue, \ struct fast_task_info *pTask); struct fast_task_info *task_queue_pop(struct fast_task_queue *pQueue); int task_queue_count(struct fast_task_queue *pQueue); int task_queue_set_buffer_size(struct fast_task_queue *pQueue, struct fast_task_info *pTask, const int expect_size); int task_queue_realloc_buffer(struct fast_task_queue *pQueue, struct fast_task_info *pTask, const int expect_size); int task_queue_get_new_buffer_size(const int min_buff_size, const int max_buff_size, const int expect_size, int *new_size); #ifdef __cplusplus } #endif #endif