提交 81515c09 编写于 作者: D Dmitry Kozlov

rewriting triton library ...

上级 2b63c6e6
...@@ -8,5 +8,6 @@ SET(sources_c ...@@ -8,5 +8,6 @@ SET(sources_c
log.c log.c
) )
INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR}) INCLUDE_DIRECTORIES(${CMAKE_CURRENT_SOURCE_DIR})
ADD_DEFINITIONS("-DUSE_SPINLOCK")
ADD_LIBRARY(${target} STATIC ${sources_c}) ADD_LIBRARY(${target} STATIC ${sources_c})
...@@ -19,101 +19,88 @@ static char* skip_word(char *str); ...@@ -19,101 +19,88 @@ static char* skip_word(char *str);
static struct conf_sect_t *find_sect(const char *name); static struct conf_sect_t *find_sect(const char *name);
static struct conf_sect_t *create_sect(const char *name); static struct conf_sect_t *create_sect(const char *name);
static void sect_add_item(struct conf_sect_t *sect,const char *name,const char *val); static void sect_add_item(struct conf_sect_t *sect, const char *name, const char *val);
static struct conf_option_t *find_item(struct conf_sect_t *,const char *name); static struct conf_option_t *find_item(struct conf_sect_t *, const char *name);
int conf_load(const char *fname) int conf_load(const char *fname)
{ {
char *buf,*str,*str2; char *buf,*str,*str2;
char *path0,*path; char *path0,*path;
int cur_line=0; int cur_line = 0;
static struct conf_sect_t *cur_sect=NULL; static struct conf_sect_t *cur_sect = NULL;
FILE *f=fopen(fname,"r");
if (!f) FILE *f = fopen(fname, "r");
{ if (!f) {
perror("conf_file: open"); perror("conf_file:open");
return -1; return -1;
} }
buf=(char*)malloc(1024); buf = malloc(1024);
path0=(char*)malloc(4096); path0 = malloc(4096);
path=(char*)malloc(4096); path = malloc(4096);
getcwd(path0,1024); getcwd(path0, 1024);
while(!feof(f)) while(!feof(f)) {
{ buf = fgets(buf, 1024, f);
buf=fgets(buf,1024,f); if (!buf)
if (!buf) break; break;
++cur_line; ++cur_line;
if (buf[strlen(buf)-1]=='\n') if (buf[strlen(buf) - 1] == '\n')
buf[strlen(buf)-1]=0; buf[strlen(buf) - 1] = 0;
str=skip_space(buf); str = skip_space(buf);
if (*str=='#' || *str==0) continue; if (*str == '#' || *str == 0)
if (strncmp(str,"$include",8)==0) continue;
{ if (strncmp(str, "$include", 8) == 0) {
str=skip_word(str); str = skip_word(str);
str=skip_space(str); str = skip_space(str);
/*if (*str=='.')
{
strcpy(path,path0);
strcat(path,str+1);
str=path;
}*/
conf_load(str); conf_load(str);
continue; continue;
} }
if (*str=='[') if (*str == '[') {
{ for (str2 = ++str; *str2 && *str2 != ']'; str2++);
for (str2=++str; *str2 && *str2!=']'; str2++); if (*str2 != ']') {
if (*str2!=']') fprintf(stderr, "conf_file:%s:%i: sintax error\n", fname, cur_line);
{
//L1:
fprintf(stderr,"conf_file:%s:%i: sintax error\n",fname,cur_line);
return -1; return -1;
} }
*str2=0; *str2 = 0;
cur_sect=find_sect(str); cur_sect = find_sect(str);
if (!cur_sect) cur_sect=create_sect(str); if (!cur_sect)
cur_sect = create_sect(str);
continue; continue;
} }
if (!cur_sect) if (!cur_sect) {
{ fprintf(stderr, "conf_file:%s:%i: no section opened\n", fname, cur_line);
fprintf(stderr,"conf_file:%s:%i: no section opened\n",fname,cur_line);
return -1; return -1;
} }
str2=skip_word(str); str2 = skip_word(str);
if (*str2==' ') if (*str2 == ' ') {
{ *str2 = 0;
*str2=0;
++str2; ++str2;
} }
str2=skip_space(str2); str2 = skip_space(str2);
if (*str2=='=' || *str2==',') if (*str2 == '=' || *str2 == ',') {
{ *str2 = 0;
*str2=0; str2 = skip_space(str2 + 1);
str2=skip_space(str2+1); if (*str2 && *(str2 + 1) && *str2 == '$' && *(str2 + 1) == '{') {
if (*str2 && *(str2+1) && *str2=='$' && *(str2+1)=='{')
{
char *s; char *s;
struct conf_option_t *opt; struct conf_option_t *opt;
for (s=str2+2; *s && *s!='}'; s++); for (s = str2+2; *s && *s != '}'; s++);
if (*s=='}') if (*s == '}') {
{ *s = 0;
*s=0; str2 += 2;
str2+=2;
} }
opt=find_item(cur_sect,str2); opt = find_item(cur_sect, str2);
if (!opt) if (!opt) {
{ fprintf(stderr, "conf_file:%s:%i: parent option not found\n", fname, cur_line);
fprintf(stderr,"conf_file:%s:%i: parent option not found\n",fname,cur_line);
return -1; return -1;
} }
str2=opt->val; str2 = opt->val;
} }
}else str2=NULL; } else
sect_add_item(cur_sect,str,str2); str2 = NULL;
sect_add_item(cur_sect, str, str2);
} }
free(buf); free(buf);
...@@ -126,54 +113,51 @@ int conf_load(const char *fname) ...@@ -126,54 +113,51 @@ int conf_load(const char *fname)
static char* skip_space(char *str) static char* skip_space(char *str)
{ {
for (; *str && *str==' '; str++); for (; *str && *str == ' '; str++);
return str; return str;
} }
static char* skip_word(char *str) static char* skip_word(char *str)
{ {
for (; *str && (*str!=' ' && *str!='='); str++); for (; *str && (*str != ' ' && *str != '='); str++);
return str; return str;
} }
static struct conf_sect_t *find_sect(const char *name) static struct conf_sect_t *find_sect(const char *name)
{ {
struct sect_t *s; struct sect_t *s;
list_for_each_entry(s,&sections,entry) list_for_each_entry(s, &sections, entry)
{ if (strcmp(s->sect->name, name) == 0) return s->sect;
if (strcmp(s->sect->name,name)==0) return s->sect;
}
return NULL; return NULL;
} }
static struct conf_sect_t *create_sect(const char *name) static struct conf_sect_t *create_sect(const char *name)
{ {
struct sect_t *s=(struct sect_t *)malloc(sizeof(struct sect_t)); struct sect_t *s = malloc(sizeof(struct sect_t));
s->sect=(struct conf_sect_t*)malloc(sizeof(struct conf_sect_t)); s->sect = malloc(sizeof(struct conf_sect_t));
s->sect->name=(char*)strdup(name); s->sect->name = (char*)strdup(name);
INIT_LIST_HEAD(&s->sect->items); INIT_LIST_HEAD(&s->sect->items);
list_add_tail(&s->entry,&sections); list_add_tail(&s->entry, &sections);
return s->sect; return s->sect;
} }
static void sect_add_item(struct conf_sect_t *sect,const char *name,const char *val) static void sect_add_item(struct conf_sect_t *sect, const char *name, const char *val)
{ {
struct conf_option_t *opt=(struct conf_option_t *)malloc(sizeof(struct conf_option_t)); struct conf_option_t *opt = malloc(sizeof(struct conf_option_t));
opt->name=(char*)strdup(name); opt->name = strdup(name);
opt->val=val?(char*)strdup(val):NULL; opt->val = val ? strdup(val) : NULL;
list_add_tail(&opt->entry,&sect->items); list_add_tail(&opt->entry, &sect->items);
} }
static struct conf_option_t *find_item(struct conf_sect_t *sect,const char *name) static struct conf_option_t *find_item(struct conf_sect_t *sect, const char *name)
{ {
struct conf_option_t *opt; struct conf_option_t *opt;
list_for_each_entry(opt,&sect->items,entry) list_for_each_entry(opt, &sect->items, entry) {
{ if (strcmp(opt->name, name) == 0)
if (strcmp(opt->name,name)==0)
return opt; return opt;
} }
......
#include <stdio.h> #include <stdio.h>
#include <stdarg.h>
#include "triton_p.h" #include "triton_p.h"
static FILE *f_error; static FILE *f_error;
static FILE *f_debug; static FILE *f_debug;
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
int log_init(void) int log_init(void)
{ {
char *log_error=conf_get_opt("core","log_error"); char *log_error = conf_get_opt("core","log_error");
char *log_debug=conf_get_opt("core","log_debug"); char *log_debug = conf_get_opt("core","log_debug");
if (log_error) if (log_error) {
{ f_error = fopen(log_error, "a");
f_error=fopen(log_error,"a"); if (!f_error) {
if (!f_error)
{
perror("log:log_error:open"); perror("log:log_error:open");
return -1; return -1;
} }
} }
if (log_debug) if (log_debug) {
{ f_debug = fopen(log_debug, "a");
f_debug=fopen(log_debug,"a"); if (!f_debug) {
if (!f_debug)
{
perror("log:log_debug:open"); perror("log:log_debug:open");
return -1; return -1;
} }
...@@ -32,3 +30,36 @@ int log_init(void) ...@@ -32,3 +30,36 @@ int log_init(void)
return 0; return 0;
} }
static void do_log(FILE *f, const char *fmt, va_list ap)
{
struct timeval tv;
struct tm tm;
char date[64];
gettimeofday(&tv, NULL);
localtime_r(&tv.tv_sec, &tm);
strftime(date, sizeof(date), "%F %H:%M:%S", &tm);
pthread_mutex_lock(&lock);
fprintf(f, "[%s.%i]", date, (int)tv.tv_usec / 1000);
vfprintf(f, fmt,ap);
pthread_mutex_unlock(&lock);
}
void triton_log_error(const char *fmt,...)
{
va_list ap;
if (!f_error)
return;
va_start(ap, fmt);
do_log(f_error, fmt, ap);
}
void triton_log_debug(const char *fmt,...)
{
va_list ap;
if (!f_debug)
return;
va_start(ap, fmt);
do_log(f_debug, fmt, ap);
}
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
#include <signal.h> #include <signal.h>
#include <string.h> #include <string.h>
#include <errno.h> #include <errno.h>
#include <unistd.h>
#include "triton_p.h" #include "triton_p.h"
...@@ -13,74 +14,75 @@ static int epoll_fd; ...@@ -13,74 +14,75 @@ static int epoll_fd;
static struct epoll_event *epoll_events; static struct epoll_event *epoll_events;
static pthread_t md_thr; static pthread_t md_thr;
static void* md_thread(void *arg); static void *md_thread(void *arg);
int md_init(void) int md_init(void)
{ {
epoll_fd=epoll_create(1); epoll_fd = epoll_create(1);
if (epoll_fd<0) if (epoll_fd < 0) {
{ perror("md:epoll_create");
perror("epoll_create");
return -1; return -1;
} }
epoll_events=malloc(max_events * sizeof(struct epoll_event)); epoll_events = malloc(max_events * sizeof(struct epoll_event));
if (!epoll_events) if (!epoll_events) {
{ fprintf(stderr,"md:cann't allocate memory\n");
fprintf(stderr,"cann't allocate memory\n");
return -1; return -1;
} }
return 0; return 0;
} }
void md_run() void md_run(void)
{ {
pthread_create(&md_thr,NULL,md_thread,NULL); if (pthread_create(&md_thr, NULL, md_thread, NULL)) {
triton_log_error("md:pthread_create: %s", strerror(errno));
_exit(-1);
}
} }
void md_terminate() void md_terminate(void)
{ {
pthread_cancel(md_thr); pthread_cancel(md_thr);
pthread_join(md_thr,NULL); pthread_join(md_thr, NULL);
} }
static void* md_thread(void *arg) static void *md_thread(void *arg)
{ {
int i,n,r; int i,n,r;
struct triton_md_handler_t *h; struct triton_md_handler_t *h;
while(1) while(1) {
{ n = epoll_wait(epoll_fd, epoll_events, max_events, -1);
n=epoll_wait(epoll_fd,epoll_events,max_events,-1); if (n < 0) {
if (n<0) if (errno == EINTR)
{ continue;
if (errno!=EINTR) triton_log_error("md:epoll_wait: %s", strerror(errno));
perror("epoll_wait"); _exit(-1);
continue;
} }
for(i=0; i<n; i++) for(i = 0; i < n; i++) {
{ h = (struct triton_md_handler_t *)epoll_events[i].data.ptr;
h=(struct triton_md_handler_t*)epoll_events[i].data.ptr;
spin_lock(&h->ctx->lock); spin_lock(&h->ctx->lock);
h->trig_epoll_events=epoll_events[i].events; h->trig_epoll_events = epoll_events[i].events;
list_add_tail(&h->entry2,&h->ctx->pending_handlers); list_add_tail(&h->entry2, &h->ctx->pending_handlers);
h->pending=1; h->pending = 1;
r=triton_queue_ctx(h->ctx); r=triton_queue_ctx(h->ctx);
spin_unlock(&h->ctx->lock); spin_unlock(&h->ctx->lock);
if (r) if (r)
triton_thread_wakeup(h->ctx->thread); triton_thread_wakeup(h->ctx->thread);
} }
} }
return NULL;
} }
void triton_md_register_handler(struct triton_md_handler_t *h) void triton_md_register_handler(struct triton_md_handler_t *h)
{ {
h->epoll_event.data.ptr=h; h->epoll_event.data.ptr = h;
if (!h->ctx) if (!h->ctx)
h->ctx=default_ctx; h->ctx = default_ctx;
spin_lock(&h->ctx->lock); spin_lock(&h->ctx->lock);
list_add_tail(&h->entry,&h->ctx->handlers); list_add_tail(&h->entry, &h->ctx->handlers);
spin_unlock(&h->ctx->lock); spin_unlock(&h->ctx->lock);
} }
void triton_md_unregister_handler(struct triton_md_handler_t *h) void triton_md_unregister_handler(struct triton_md_handler_t *h)
...@@ -94,42 +96,47 @@ void triton_md_unregister_handler(struct triton_md_handler_t *h) ...@@ -94,42 +96,47 @@ void triton_md_unregister_handler(struct triton_md_handler_t *h)
int triton_md_enable_handler(struct triton_md_handler_t *h, int mode) int triton_md_enable_handler(struct triton_md_handler_t *h, int mode)
{ {
int r; int r;
int events=h->epoll_event.events; int events = h->epoll_event.events;
if (mode&MD_MODE_READ) if (mode & MD_MODE_READ)
h->epoll_event.events|=EPOLLIN; h->epoll_event.events |= EPOLLIN;
if (mode&MD_MODE_WRITE) if (mode & MD_MODE_WRITE)
h->epoll_event.events|=EPOLLOUT; h->epoll_event.events |= EPOLLOUT;
h->epoll_event.events|=EPOLLET; h->epoll_event.events |= EPOLLET;
if (events) if (events)
r=epoll_ctl(epoll_fd,EPOLL_CTL_MOD,h->fd,&h->epoll_event); r = epoll_ctl(epoll_fd, EPOLL_CTL_MOD, h->fd, &h->epoll_event);
else else
r=epoll_ctl(epoll_fd,EPOLL_CTL_ADD,h->fd,&h->epoll_event); r = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, h->fd, &h->epoll_event);
if (r)
triton_log_error("md:epoll_ctl: %s",strerror(errno));
return r; return r;
} }
int triton_md_disable_handler(struct triton_md_handler_t *h,int mode) int triton_md_disable_handler(struct triton_md_handler_t *h,int mode)
{ {
int r; int r=0;
if (!h->epoll_event.events) if (!h->epoll_event.events)
return -1; return -1;
if (mode&MD_MODE_READ) if (mode & MD_MODE_READ)
h->epoll_event.events&=~EPOLLIN; h->epoll_event.events &= ~EPOLLIN;
if (mode&MD_MODE_WRITE) if (mode & MD_MODE_WRITE)
h->epoll_event.events&=~EPOLLOUT; h->epoll_event.events &= ~EPOLLOUT;
if (h->epoll_event.events&(EPOLLIN|EPOLLOUT)) if (h->epoll_event.events & (EPOLLIN | EPOLLOUT))
r=epoll_ctl(epoll_fd,EPOLL_CTL_MOD,h->fd,&h->epoll_event); r = epoll_ctl(epoll_fd, EPOLL_CTL_MOD, h->fd, &h->epoll_event);
else else {
{ h->epoll_event.events = 0;
h->epoll_event.events=0; r = epoll_ctl(epoll_fd, EPOLL_CTL_DEL, h->fd, NULL);
r=epoll_ctl(epoll_fd,EPOLL_CTL_DEL,h->fd,NULL);
} }
if (r)
triton_log_error("md:epoll_ctl: %s",strerror(errno));
return r; return r;
} }
...@@ -3,9 +3,10 @@ ...@@ -3,9 +3,10 @@
#ifdef USE_SPINLOCK #ifdef USE_SPINLOCK
typedef unsigned char spinlock_t; typedef unsigned char spinlock_t;
#define spin_lock(l) {while(__sync_lock_test_and_set(l,1);} #define spin_lock(l) {while(__sync_lock_test_and_set(l,1));}
#define spin_unlock(l) __sync_lock_release(l) #define spin_unlock(l) __sync_lock_release(l)
#define SPINLOCK_INITIALIZER 0 #define SPINLOCK_INITIALIZER 0
#define spinlock_init(l) {*(l)=0;}
#else #else
#include <pthread.h> #include <pthread.h>
typedef pthread_mutex_t spinlock_t; typedef pthread_mutex_t spinlock_t;
......
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
#include "triton_p.h" #include "triton_p.h"
int max_events=128; extern int max_events;
static int epoll_fd; static int epoll_fd;
static struct epoll_event *epoll_events; static struct epoll_event *epoll_events;
...@@ -18,17 +18,15 @@ static void *timer_thread(void *arg); ...@@ -18,17 +18,15 @@ static void *timer_thread(void *arg);
int timer_init(void) int timer_init(void)
{ {
epoll_fd=epoll_create(1); epoll_fd = epoll_create(1);
if (epoll_fd<0) if (epoll_fd < 0) {
{ perror("timer:epoll_create");
perror("epoll_create");
return -1; return -1;
} }
epoll_events=malloc(max_events * sizeof(struct epoll_event)); epoll_events = malloc(max_events * sizeof(struct epoll_event));
if (!epoll_events) if (!epoll_events) {
{ fprintf(stderr,"timer:cann't allocate memory\n");
fprintf(stderr,"cann't allocate memory\n");
return -1; return -1;
} }
...@@ -37,13 +35,16 @@ int timer_init(void) ...@@ -37,13 +35,16 @@ int timer_init(void)
void timer_run(void) void timer_run(void)
{ {
pthread_create(&timer_thr,NULL,timer_thread,NULL); if (pthread_create(&timer_thr, NULL, timer_thread, NULL)) {
triton_log_error("timer:pthread_create: %s",strerror(errno));
_exit(-1);
}
} }
void timer_terminate(void) void timer_terminate(void)
{ {
pthread_cancel(timer_thr); pthread_cancel(timer_thr);
pthread_join(timer_thr,NULL); pthread_join(timer_thr, NULL);
} }
void *timer_thread(void *arg) void *timer_thread(void *arg)
...@@ -51,56 +52,53 @@ void *timer_thread(void *arg) ...@@ -51,56 +52,53 @@ void *timer_thread(void *arg)
int i,n,r; int i,n,r;
struct triton_timer_t *t; struct triton_timer_t *t;
while(1) while(1) {
{ n = epoll_wait(epoll_fd, epoll_events, max_events, -1);
n=epoll_wait(epoll_fd,epoll_events,max_events,-1); if (n < 0) {
if (n<0) if (errno == EINTR)
{ continue;
if (errno!=EINTR) triton_log_error("timer:epoll_wait: %s", strerror(errno));
perror("epoll_wait"); _exit(-1);
continue;
} }
for(i=0; i<n; i++) for(i = 0; i < n; i++) {
{ t = (struct triton_timer_t *)epoll_events[i].data.ptr;
t=(struct triton_timer_t*)epoll_events[i].data.ptr;
spin_lock(&t->ctx->lock); spin_lock(&t->ctx->lock);
list_add_tail(&t->entry2,&t->ctx->pending_timers); list_add_tail(&t->entry2, &t->ctx->pending_timers);
t->pending=1; t->pending = 1;
r=triton_queue_ctx(t->ctx); r=triton_queue_ctx(t->ctx);
spin_unlock(&t->ctx->lock); spin_unlock(&t->ctx->lock);
if (r) if (r)
triton_thread_wakeup(t->ctx->thread); triton_thread_wakeup(t->ctx->thread);
} }
} }
return NULL;
} }
int triton_timer_add(struct triton_timer_t *t, int abs_time) int triton_timer_add(struct triton_timer_t *t, int abs_time)
{ {
t->epoll_event.data.ptr=t; t->epoll_event.data.ptr = t;
t->epoll_event.events=EPOLLIN|EPOLLET; t->epoll_event.events = EPOLLIN | EPOLLET;
if (!t->ctx) if (!t->ctx)
t->ctx=default_ctx; t->ctx = default_ctx;
t->fd=timerfd_create(CLOCK_MONOTONIC,TFD_NONBLOCK); t->fd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
if (t->fd<0) if (t->fd < 0) {
{ triton_log_error("timer:timerfd_create: %s" ,strerror(errno));
fprintf(stderr,"timer: timerfd_create failed: %s\n",strerror(errno));
return -1; return -1;
} }
if (triton_timer_mod(t,abs_time)) if (triton_timer_mod(t, abs_time)) {
{
close(t->fd); close(t->fd);
return -1; return -1;
} }
spin_lock(&t->ctx->lock); spin_lock(&t->ctx->lock);
list_add_tail(&t->entry,&t->ctx->timers); list_add_tail(&t->entry, &t->ctx->timers);
spin_unlock(&t->ctx->lock); spin_unlock(&t->ctx->lock);
if (epoll_ctl(epoll_fd,EPOLL_CTL_ADD,t->fd,&t->epoll_event)) if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, t->fd, &t->epoll_event)) {
{ triton_log_error("timer:epoll_ctl: %s", strerror(errno));
fprintf(stderr,"timer: epoll_ctl failed: %s\n",strerror(errno));
spin_lock(&t->ctx->lock); spin_lock(&t->ctx->lock);
list_del(&t->entry); list_del(&t->entry);
spin_unlock(&t->ctx->lock); spin_unlock(&t->ctx->lock);
...@@ -112,20 +110,18 @@ int triton_timer_add(struct triton_timer_t *t, int abs_time) ...@@ -112,20 +110,18 @@ int triton_timer_add(struct triton_timer_t *t, int abs_time)
} }
int triton_timer_mod(struct triton_timer_t *t,int abs_time) int triton_timer_mod(struct triton_timer_t *t,int abs_time)
{ {
struct itimerspec ts= struct itimerspec ts = {
{ .it_value.tv_sec = t->expire_tv.tv_sec,
.it_value.tv_sec=t->expire_tv.tv_sec, .it_value.tv_nsec = t->expire_tv.tv_usec * 1000,
.it_value.tv_nsec=t->expire_tv.tv_usec*1000, .it_interval.tv_sec = t->period / 1000,
.it_interval.tv_sec=t->period/1000, .it_interval.tv_nsec = t->period % 1000 * 1000,
.it_interval.tv_nsec=t->period%1000*1000,
}; };
if (t->expire_tv.tv_sec==0 && t->expire_tv.tv_usec==0) if (t->expire_tv.tv_sec == 0 && t->expire_tv.tv_usec == 0)
ts.it_value=ts.it_interval; ts.it_value = ts.it_interval;
if (timerfd_settime(t->fd,abs_time?TFD_TIMER_ABSTIME:0,&ts,NULL)) if (timerfd_settime(t->fd, abs_time ? TFD_TIMER_ABSTIME : 0, &ts, NULL)) {
{ triton_log_error("timer:timerfd_settime: %s", strerror(errno));
fprintf(stderr,"timer: timerfd_settime failed: %s\n",strerror(errno));
return -1; return -1;
} }
...@@ -133,7 +129,7 @@ int triton_timer_mod(struct triton_timer_t *t,int abs_time) ...@@ -133,7 +129,7 @@ int triton_timer_mod(struct triton_timer_t *t,int abs_time)
} }
void triton_timer_del(struct triton_timer_t *t) void triton_timer_del(struct triton_timer_t *t)
{ {
epoll_ctl(epoll_fd,EPOLL_CTL_DEL,t->fd,&t->epoll_event); epoll_ctl(epoll_fd, EPOLL_CTL_DEL, t->fd, &t->epoll_event);
close(t->fd); close(t->fd);
spin_lock(&t->ctx->lock); spin_lock(&t->ctx->lock);
list_del(&t->entry); list_del(&t->entry);
......
...@@ -3,18 +3,20 @@ ...@@ -3,18 +3,20 @@
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#include <unistd.h>
#include "triton_p.h" #include "triton_p.h"
int thread_count=64; int thread_count = 64;
int max_events = 64;
static spinlock_t threads_lock=SPINLOCK_INITIALIZER; static spinlock_t threads_lock = SPINLOCK_INITIALIZER;
static LIST_HEAD(threads); static LIST_HEAD(threads);
static LIST_HEAD(sleep_threads); static LIST_HEAD(sleep_threads);
static LIST_HEAD(ctx_queue); static LIST_HEAD(ctx_queue);
static spinlock_t ctx_list_lock=SPINLOCK_INITIALIZER; static spinlock_t ctx_list_lock = SPINLOCK_INITIALIZER;
static LIST_HEAD(ctx_list); static LIST_HEAD(ctx_list);
struct triton_ctx_t *default_ctx; struct triton_ctx_t *default_ctx;
...@@ -22,7 +24,7 @@ static int terminate; ...@@ -22,7 +24,7 @@ static int terminate;
void triton_thread_wakeup(struct triton_thread_t *thread) void triton_thread_wakeup(struct triton_thread_t *thread)
{ {
pthread_kill(thread->thread,SIGUSR1); pthread_kill(thread->thread, SIGUSR1);
} }
static void* triton_thread(struct triton_thread_t *thread) static void* triton_thread(struct triton_thread_t *thread)
...@@ -33,72 +35,65 @@ static void* triton_thread(struct triton_thread_t *thread) ...@@ -33,72 +35,65 @@ static void* triton_thread(struct triton_thread_t *thread)
int sig; int sig;
sigemptyset(&set); sigemptyset(&set);
sigaddset(&set,SIGUSR1); sigaddset(&set, SIGUSR1);
sigaddset(&set,SIGQUIT); sigaddset(&set, SIGQUIT);
while(1) while(1){
{ sigwait(&set, &sig);
sigwait(&set,&sig);
cont: cont:
if (thread->ctx->need_close) if (thread->ctx->need_close) {
{
thread->ctx->close(thread->ctx); thread->ctx->close(thread->ctx);
thread->ctx->need_close=0; thread->ctx->need_close = 0;
} }
while (1) while (1) {
{
spin_lock(&thread->ctx->lock); spin_lock(&thread->ctx->lock);
if (!list_empty(&thread->ctx->pending_timers)) if (!list_empty(&thread->ctx->pending_timers)) {
{ t = list_entry(thread->ctx->pending_timers.next, typeof(*t), entry2);
t=list_entry(thread->ctx->pending_timers.next,typeof(*t),entry2);
list_del(&t->entry2); list_del(&t->entry2);
spin_unlock(&thread->ctx->lock); spin_unlock(&thread->ctx->lock);
if (t->expire(t)) if (t->expire(t))
continue; continue;
} }
if (!list_empty(&thread->ctx->pending_handlers)) if (!list_empty(&thread->ctx->pending_handlers)) {
{ h = list_entry(thread->ctx->pending_handlers.next, typeof(*h), entry2);
h=list_entry(thread->ctx->pending_handlers.next,typeof(*h),entry2);
list_del(&h->entry2); list_del(&h->entry2);
h->pending=0; h->pending = 0;
spin_unlock(&thread->ctx->lock); spin_unlock(&thread->ctx->lock);
if (h->trig_epoll_events&(EPOLLIN|EPOLLERR|EPOLLHUP)) if (h->trig_epoll_events & (EPOLLIN | EPOLLERR | EPOLLHUP))
if (h->read) if (h->read)
if (h->read(h)) if (h->read(h))
continue; continue;
if (h->trig_epoll_events&(EPOLLOUT)) if (h->trig_epoll_events & EPOLLOUT)
if (h->write) if (h->write)
if (h->write(h)) if (h->write(h))
continue; continue;
h->trig_epoll_events=0; h->trig_epoll_events = 0;
continue; continue;
} }
thread->ctx->thread=NULL; thread->ctx->thread = NULL;
spin_unlock(&thread->ctx->lock); spin_unlock(&thread->ctx->lock);
if (thread->ctx->need_free) if (thread->ctx->need_free)
thread->ctx->free(thread->ctx); thread->ctx->free(thread->ctx);
thread->ctx=NULL; thread->ctx = NULL;
break; break;
} }
spin_lock(&threads_lock); spin_lock(&threads_lock);
if (!list_empty(&ctx_queue)) if (!list_empty(&ctx_queue)) {
{ thread->ctx = list_entry(ctx_queue.next, typeof(*thread->ctx), entry2);
thread->ctx=list_entry(ctx_queue.next,typeof(*thread->ctx),entry2);
list_del(&thread->ctx->entry2); list_del(&thread->ctx->entry2);
spin_unlock(&threads_lock); spin_unlock(&threads_lock);
spin_lock(&thread->ctx->lock); spin_lock(&thread->ctx->lock);
thread->ctx->thread=thread; thread->ctx->thread = thread;
thread->ctx->queued=0; thread->ctx->queued = 0;
spin_unlock(&thread->ctx->lock); spin_unlock(&thread->ctx->lock);
goto cont; goto cont;
}else } else {
{
if (!terminate) if (!terminate)
list_add(&thread->entry2,&sleep_threads); list_add(&thread->entry2, &sleep_threads);
spin_unlock(&threads_lock); spin_unlock(&threads_lock);
if (terminate) if (terminate)
return NULL; return NULL;
...@@ -108,10 +103,15 @@ cont: ...@@ -108,10 +103,15 @@ cont:
struct triton_thread_t *create_thread() struct triton_thread_t *create_thread()
{ {
struct triton_thread_t *thread=malloc(sizeof(*thread)); struct triton_thread_t *thread = malloc(sizeof(*thread));
if (!thread)
memset(thread,0,sizeof(*thread)); return NULL;
pthread_create(&thread->thread,NULL,(void*(*)(void*))triton_thread,thread);
memset(thread, 0, sizeof(*thread));
if (pthread_create(&thread->thread, NULL, (void*(*)(void*))triton_thread, thread)) {
triton_log_error("pthread_create: %s", strerror(errno));
return NULL;
}
return thread; return thread;
} }
...@@ -122,15 +122,14 @@ int triton_queue_ctx(struct triton_ctx_t *ctx) ...@@ -122,15 +122,14 @@ int triton_queue_ctx(struct triton_ctx_t *ctx)
return 0; return 0;
spin_lock(&threads_lock); spin_lock(&threads_lock);
if (list_empty(&sleep_threads)) if (list_empty(&sleep_threads)) {
{ list_add_tail(&ctx->entry2, &ctx_queue);
list_add_tail(&ctx->entry2,&ctx_queue);
spin_unlock(&threads_lock); spin_unlock(&threads_lock);
ctx->queued=1; ctx->queued = 1;
return 0; return 0;
} }
ctx->thread=list_entry(sleep_threads.next,typeof(*ctx->thread),entry2); ctx->thread = list_entry(sleep_threads.next, typeof(*ctx->thread), entry2);
list_del(&ctx->thread->entry2); list_del(&ctx->thread->entry2);
spin_unlock(&threads_lock); spin_unlock(&threads_lock);
...@@ -146,13 +145,13 @@ void triton_register_ctx(struct triton_ctx_t *ctx) ...@@ -146,13 +145,13 @@ void triton_register_ctx(struct triton_ctx_t *ctx)
INIT_LIST_HEAD(&ctx->pending_timers); INIT_LIST_HEAD(&ctx->pending_timers);
spin_lock(&ctx_list_lock); spin_lock(&ctx_list_lock);
list_add_tail(&ctx->entry,&ctx_list); list_add_tail(&ctx->entry, &ctx_list);
spin_unlock(&ctx_list_lock); spin_unlock(&ctx_list_lock);
} }
void triton_unregister_ctx(struct triton_ctx_t *ctx) void triton_unregister_ctx(struct triton_ctx_t *ctx)
{ {
ctx->need_free=1; ctx->need_free = 1;
spin_lock(&ctx_list_lock); spin_lock(&ctx_list_lock);
list_del(&ctx->entry); list_del(&ctx->entry);
spin_unlock(&ctx_list_lock); spin_unlock(&ctx_list_lock);
...@@ -161,8 +160,7 @@ void triton_unregister_ctx(struct triton_ctx_t *ctx) ...@@ -161,8 +160,7 @@ void triton_unregister_ctx(struct triton_ctx_t *ctx)
int triton_init(const char *conf_file) int triton_init(const char *conf_file)
{ {
default_ctx=malloc(sizeof(*default_ctx)); default_ctx=malloc(sizeof(*default_ctx));
if (!default_ctx) if (!default_ctx) {
{
fprintf(stderr,"cann't allocate memory\n"); fprintf(stderr,"cann't allocate memory\n");
return -1; return -1;
} }
...@@ -188,11 +186,13 @@ void triton_run() ...@@ -188,11 +186,13 @@ void triton_run()
struct triton_thread_t *t; struct triton_thread_t *t;
int i; int i;
for(i=0;i<thread_count;i++) for(i = 0; i < thread_count; i++) {
{ t = create_thread();
t=create_thread(); if (!t)
list_add_tail(&t->entry,&threads); _exit(-1);
list_add_tail(&t->entry2,&sleep_threads);
list_add_tail(&t->entry, &threads);
list_add_tail(&t->entry2, &sleep_threads);
} }
md_run(); md_run();
...@@ -208,23 +208,22 @@ void triton_terminate() ...@@ -208,23 +208,22 @@ void triton_terminate()
timer_terminate(); timer_terminate();
spin_lock(&ctx_list_lock); spin_lock(&ctx_list_lock);
list_for_each_entry(ctx,&ctx_list,entry) list_for_each_entry(ctx, &ctx_list, entry) {
{
spin_lock(&ctx->lock); spin_lock(&ctx->lock);
ctx->need_close=1; ctx->need_close = 1;
triton_queue_ctx(ctx); triton_queue_ctx(ctx);
spin_unlock(&ctx->lock); spin_unlock(&ctx->lock);
} }
spin_unlock(&ctx_list_lock); spin_unlock(&ctx_list_lock);
spin_lock(&threads_lock); spin_lock(&threads_lock);
terminate=1; terminate = 1;
spin_unlock(&threads_lock); spin_unlock(&threads_lock);
list_for_each_entry(t,&threads,entry) list_for_each_entry(t, &threads, entry)
triton_thread_wakeup(t); triton_thread_wakeup(t);
list_for_each_entry(t,&threads,entry) list_for_each_entry(t, &threads, entry)
pthread_join(t->thread,NULL); pthread_join(t->thread, NULL);
} }
...@@ -15,5 +15,7 @@ struct triton_ctx_t *default_ctx; ...@@ -15,5 +15,7 @@ struct triton_ctx_t *default_ctx;
int triton_queue_ctx(struct triton_ctx_t*); int triton_queue_ctx(struct triton_ctx_t*);
void triton_thread_wakeup(struct triton_thread_t*); void triton_thread_wakeup(struct triton_thread_t*);
int conf_load(const char *fname); int conf_load(const char *fname);
void triton_log_error(const char *fmt,...);
void triton_log_debug(const char *fmt,...);
#endif #endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册