提交 14763d00 编写于 作者: K Kozlov Dmitry

rewriting triton library...

上级 6f071c1e
#include "triton/triton.h"
#include "log.h"
void sigterm(int num)
{
triton_terminate();
}
int main(int argc,char **argv)
{
sigset_t set;
log_init(stdout,4,0);
triton_init();
triton_run();
signal(SIGTERM,sigterm);
sigfillset(&set);
sigdelset(&set, SIGTERM);
sigdelset(&set, SIGSEGV);
sigdelset(&set, SIGILL);
sigdelset(&set, SIGFPE);
sigdelset(&set, SIGBUS);
sigsuspend(&set);
return EXIT_SUCCESS;
}
......@@ -38,6 +38,7 @@
struct pptp_conn_t
{
struct triton_ctx_t ctx;
struct triton_md_handler_t hnd;
int state;
......@@ -397,3 +398,91 @@ static void ppp_finished(struct ppp_t *ppp)
conn->state=STATE_FIN;
conn->hnd.twait=1000;
}
//==================================
static int pptp_connect(struct triton_md_handler_t *h)
{
struct sockaddr_in addr;
socklen_t size=sizeof(addr);
int sock;
struct pptp_conn_t *conn;
while(1)
{
sock=accept(f->fd,(struct sockaddr *)&addr,&size);
if (sock<0)
{
if (errno==EAGAIN)
return 0;
log_error("pptp: accept failed\n");
continue;
}
conn=malloc(sizeof(*conn));
memset(conn,0,sizeof(*conn));
conn->hnd.fd=fd;
conn->hnd.read=pptp_read;
conn->hnd.write=pptp_write;
conn->hnd.close=pptp_close;
conn->hnd.ctx=&conn->ctx;
conn->in_buf=malloc(PPTP_CTRL_SIZE_MAX);
conn->out_buf=malloc(PPTP_CTRL_SIZE_MAX);
triton_register_ctx(&conn->ctx);
triton_md_register_handler(&conn->hnd);
triton_md_enable_handler(&conn->hnd,MD_MODE_READ);
}
}
static void pptp_serv_close(struct triton_md_handler_t *h)
{
triton_md_unregister_handler(h);
close(h->fd);
}
struct pptp_serv_t
{
struct triton_context_t ctx;
struct triton_md_handler_t hnd;
};
static struct pptp_serv_t serv=
{
.hnd.read=pptp_connect,
.hnd.close=pptp_serv_close,
.hnd.ctx=&serv.ctx,
};
void __constructor pptp_init()
{
struct sockaddr_in addr;
socklen_t size;
serv.hnd.fd=socket (PF_INET, SOCK_STREAM, 0);
if (serv.hnd.fd<0)
{
log_error("pptp: failed to create server socket\n");
return;
}
addr.sin_family = AF_INET;
addr.sin_port = htons (PPTP_PORT);
addr.sin_addr.s_addr = htonl (INADDR_ANY);
if (bind (serv.hnd.fd, (struct sockaddr *) &addr, sizeof (addr)) < 0)
{
perror("pptp: bind");
log_error("pptp: failed to bind socket\n");
close(serv.hnd.fd);
return;
}
if (listen (serv.hnd.fd, 100)<0)
{
log_error("pptp: failed to listen socket\n");
close(serv.hnd.fd);
return -1;
}
triton_register_ctx(&serv.ctx);
triton_md_register_handler(&serv.hnd);
triton_md_enable_handler(&serv.hnd,MD_MODE_READ);
}
/*
* C Implementation: pptpd
*
* Description:
*
*
* Author: <xeb@mail.ru>, (C) 2009
*
* Copyright: See COPYING file that comes with this distribution
*
*/
#include <unistd.h>
#include <stdlib.h>
#include <stdio.h>
#include <pthread.h>
#include <stdarg.h>
#include <errno.h>
#include <string.h>
#include <sys/poll.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <netinet/in.h>
#include "list.h"
#include "pptp_prot.h"
#include "triton/triton.h"
#include "pptpd.h"
#include "log.h"
static struct ctrl_thread_t *threads=NULL;
static int threads_count=0;
int start_server(void)
{
int sock,c_sock;
int r,min_thr,min_cnt;
struct pollfd pfd;
struct sockaddr_in addr;
socklen_t size;
sock=socket (PF_INET, SOCK_STREAM, 0);
if (sock<0)
{
log_error("failed to create socket\n");
return -1;
}
addr.sin_family = AF_INET;
addr.sin_port = htons (PPTP_PORT);
addr.sin_addr.s_addr = htonl (INADDR_ANY);
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &sock, sizeof(sock));
if (bind (sock, (struct sockaddr *) &addr, sizeof (addr)) < 0)
{
perror("bind");
log_error("failed to bind socket\n");
return -1;
}
if (listen (sock, 1000)<0)
{
log_error("failed to listen socket\n");
return -1;
}
pfd.fd=sock;
pfd.events=POLLIN;
while(1)
{
r=poll(&pfd,1,-1);
if (r<0 && errno!=EINTR)
{
log_error("poll failed\n");
return -2;
}
if (r<=0) continue;
if (!(pfd.revents&POLLIN)) continue;
size=sizeof(addr);
c_sock=accept(sock,(struct sockaddr *)&addr,&size);
if (c_sock<0)
{
log_error("client accept failed\n");
continue;
}
min_thr=0; min_cnt=65536;
for(r=0; r<threads_count; r++)
{
pthread_mutex_lock(&threads[r].lock);
if (threads[r].count<min_cnt)
{
min_cnt=threads[r].count;
min_thr=r;
}
pthread_mutex_unlock(&threads[r].lock);
}
write(threads[min_thr].pipe_fd[1],&c_sock,sizeof(c_sock));
}
}
int start_threads(int cnt)
{
int i;
if (!cnt) cnt=sysconf(_SC_NPROCESSORS_CONF);
threads=malloc(cnt*sizeof(*threads));
memset(threads,0,cnt*sizeof(*threads));
for(i=0; i<cnt; i++)
{
//threads[i].lock=PTHREAD_MUTEX_INITIALIZER;
if (pipe(threads[i].pipe_fd))
{
log_error("failed to create pipe\n");
return -1;
}
if (triton_run((int(*)(void*))ctrl_init,&threads[i]))
{
log_error("triton_run failed\n");
return -1;
}
}
threads_count=cnt;
return 0;
}
int main(int argc,char **argv)
{
log_init(stdout,4,0);
start_threads(0);
start_server();
return EXIT_SUCCESS;
}
......@@ -2,8 +2,6 @@ SET(target triton)
SET(sources_c
md.c
conf_file.c
coroutine.c
event.c
timer.c
options.c
loader.c
......
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <signal.h>
#include <string.h>
#include "triton_p.h"
#ifdef USE_CORO
static LIST_HEAD(coroutines);
asm(".hidden current_coro");
struct coroutine_t *current_coro=NULL;
//asm(".hidden sched_uc");
static ucontext_t sched_uc;
asm(".hidden schedule");
void schedule(void)
{
struct coroutine_t *coro;
struct list_head *p;
while(1)
{
current_coro=NULL;
for(p=coroutines.next; p!=&coroutines; p=p->next)
{
coro=list_entry(p,typeof(*current_coro),entry);
if (coro->time.tv_sec)
{
if (!current_coro) current_coro=coro;
else if (coro->time.tv_sec<current_coro->time.tv_sec) continue;
else if (coro->time.tv_sec>current_coro->time.tv_sec || coro->time.tv_usec>current_coro->time.tv_usec) current_coro=coro;
}
}
if (current_coro)
{
get_time(&current_coro->time);
swapcontext(&sched_uc,&current_coro->uc);
//break;
}else
{
printf("triton: coroutine: bug: no current coro !!!\n");
exit(-1);
}
}
}
void coroutine_init(void)
{
getcontext(&sched_uc);
sched_uc.uc_stack.ss_sp=malloc(DEF_COROUTINE_STACK);
sched_uc.uc_stack.ss_size=DEF_COROUTINE_STACK;
makecontext(&sched_uc,schedule,0);
}
void triton_coroutine_schedule()
{
memset(&current_coro->time,0,sizeof(current_coro->time));
memset(&current_coro->timeout,0,sizeof(current_coro->timeout));
swapcontext(&current_coro->uc,&sched_uc);
}
long int triton_coroutine_create(int stack_size,triton_coroutine_func func,void *arg,int run)
{
struct coroutine_t *coro=malloc(sizeof(*coro));
memset(coro,0,sizeof(*coro));
if (!stack_size) stack_size=DEF_COROUTINE_STACK;//+SIGSTKSZ;
getcontext(&coro->uc);
coro->uc.uc_link=&sched_uc;
coro->uc.uc_stack.ss_sp=malloc(stack_size);
coro->uc.uc_stack.ss_size=stack_size;
makecontext(&coro->uc,(void (*)(void))func,1,arg);
if (run) coro->time.tv_sec=1;
list_add(&coro->entry,&coroutines);
return (long int)coro;
}
void triton_coroutine_delete(long int id)
{
struct coroutine_t *coro=(struct coroutine_t *)id;
list_del(&coro->entry);
free(coro->uc.uc_stack.ss_sp);
}
int triton_coroutine_schedule_timeout(int msec)
{
//current_coro->msleep=msec;
struct timeval tv;
int t;
get_time(&current_coro->timeout);
current_coro->timeout.tv_sec+=msec/1000;
current_coro->timeout.tv_usec+=(msec%1000)*1000;
if (current_coro->timeout.tv_usec>=1000000)
{
current_coro->timeout.tv_sec++;
current_coro->timeout.tv_usec-=1000000;
}
//triton_coroutine_schedule();
memset(&current_coro->time,0,sizeof(current_coro->time));
//memset(&current_coro->timeout,0,sizeof(current_coro->timeout));
swapcontext(&current_coro->uc,&sched_uc);
get_time(&tv);
t=(current_coro->timeout.tv_sec-tv.tv_sec)*1000+(current_coro->timeout.tv_usec-tv.tv_usec)/1000;
if (t<0) t=0;
return t;
}
void triton_coroutine_wakeup(long int id)
{
struct coroutine_t *coro=(struct coroutine_t *)id;
struct coroutine_t *cur_coro=current_coro;
get_time(&current_coro->time);
current_coro=coro;
swapcontext(&cur_coro->uc,&coro->uc);
}
asm(".hidden coroutine_get_timeout");
int coroutine_get_timeout(struct timeval *tv)
{
struct coroutine_t *coro;
struct list_head *p;
int twait,t=-1;
for(p=coroutines.next; p!=&coroutines; p=p->next)
{
coro=list_entry(p,typeof(*coro),entry);
if (coro->timeout.tv_sec)
{
twait=(coro->timeout.tv_sec-tv->tv_sec)*1000+(coro->timeout.tv_usec-tv->tv_usec)/1000;
if (t==-1 || twait<t) t=twait;
}
}
return t;
}
asm(".hidden coroutine_check_timeout");
void coroutine_check_timeout(struct timeval *tv)
{
struct coroutine_t *coro;
struct list_head *p;
for(p=coroutines.next; p!=&coroutines;)
{
coro=list_entry(p,typeof(*coro),entry);
p=p->next;
if (coro->timeout.tv_sec && (tv->tv_sec>coro->timeout.tv_sec || (tv->tv_sec==coro->timeout.tv_sec && tv->tv_usec>=coro->timeout.tv_usec)))
{
triton_coroutine_wakeup((long int)coro);
}
}
}
#endif
#include <stdio.h>
#include <stdlib.h>
#include <stdarg.h>
#include <string.h>
#include "triton_p.h"
#define EVENTS1_SIZE 1024
static __thread struct list_head events2;
static __thread struct event_t *events1;
static __thread long *args;
asm(".hidden event_init");
static struct event_t *find_event(int ev_id);
static struct event_t *create_event(int ev_id);
void event_init(void)
{
int i;
args=malloc(MAX_ARGS*sizeof(long));
events1=(struct event_t *)malloc(EVENTS1_SIZE*sizeof(struct event_t));
for (i=0; i<EVENTS1_SIZE; i++)
{
events1[i].ev_id=i;
INIT_LIST_HEAD(&events1[i].handlers);
}
INIT_LIST_HEAD(&events2);
}
void triton_event_register_handler(int ev_id,triton_event_func func,int arg_cnt,...)
{
struct event_t *ev;
struct event_handler_t *ev_h;
ev=find_event(ev_id);
if (!ev)
ev=create_event(ev_id);
ev_h=(struct event_handler_t*)malloc(sizeof(struct event_handler_t));
memset(ev_h,0,sizeof(*ev_h));
ev_h->event_func=func;
if (arg_cnt)
{
va_list p;
va_start(p,arg_cnt);
ev_h->arg_cnt=arg_cnt;
ev_h->args=malloc(arg_cnt*sizeof(long));
#ifdef BROKEN_GCC
for(i=0; i<arg_cnt; i++)
*((int*)ev_h->args+i)=va_arg(p,long);
#else
memcpy(ev_h->args,p,arg_cnt*sizeof(long));
#endif
va_end(p);
}
list_add_tail(&ev_h->entry,&ev->handlers);
}
void triton_event_unregister_handler(int ev_id,triton_event_func func)
{
struct event_t *ev;
struct event_handler_t *ev_h;
ev=find_event(ev_id);
if (!ev)
return;
list_for_each_entry(ev_h,&ev->handlers,entry)
{
if (ev_h->event_func==func)
{
list_del(&ev_h->entry);
if (ev_h->args) free(ev_h->args);
free(ev_h);
if (list_empty(&ev->handlers) && ev_id>=EVENTS1_SIZE)
{
list_del(&ev->entry);
free(ev);
}
return;
}
}
}
/*#define dyn_call(func,arg_cnt,args)\
asm("movl %%esp,%%edi;\n\
movl %0,%%esi;\n\
movl %1,%%ecx;\n\
cld;\n\
rep movsl;\n\
call *%2;\n"::"m" (args),"m" (arg_cnt),"m" (func):"%edi","%esi","%ecx");*/
void triton_event_fire(int ev_id,int arg_cnt,...)
{
struct event_t *ev;
struct event_handler_t *ev_h;
struct list_head *p1,*p2;
va_list p;
//void *args_p=&args;
//char pp[ARG_OFFSET+MAX_ARGS*sizeof(int)];
//memcpy(pp,__builtin_apply_args(),ARG_OFFSET);
ev=find_event(ev_id);
if (!ev)
return;
list_for_each_safe(p1,p2,&ev->handlers)
{
ev_h=list_entry(p1,struct event_handler_t,entry);
if (ev_h->arg_cnt) memcpy(args,ev_h->args,ev_h->arg_cnt*sizeof(long));
va_start(p,arg_cnt);
#ifdef BROKEN_GCC
for(i=0; i<arg_cnt; i++)
args[ev_h->arg_cnt+i]=va_arg(p,long);
#else
memcpy(args+ev_h->arg_cnt,p,arg_cnt*sizeof(long));
#endif
//memcpy(pp+ARG_OFFSET,args,(ev_h->arg_cnt+arg_cnt)*sizeof(int));
//__builtin_apply(ev_h->event_func,pp,ARG_OFFSET+(ev_h->arg_cnt+arg_cnt)*sizeof(int));
//ev_h->event_func(ev_id,arg);
//__builtin_apply(ev_h->event_func,args_p,(ev_h->arg_cnt+arg_cnt)*sizeof(int));
dyn_call(ev_h->event_func,ev_h->arg_cnt+arg_cnt,args);
}
va_end(p);
}
static struct event_t *find_event(int ev_id)
{
struct event_t *ev;
if (ev_id<EVENTS1_SIZE)
return events1+ev_id;
list_for_each_entry(ev,&events2,entry)
{
if (ev->ev_id==ev_id)
return ev;
}
return NULL;
}
static struct event_t *create_event(int ev_id)
{
struct event_t *ev=(struct event_t *)malloc(sizeof(struct event_t));
INIT_LIST_HEAD(&ev->handlers);
list_add_tail(&ev->entry,&events2);
return ev;
}
......@@ -6,337 +6,137 @@
#include "triton_p.h"
#define USE_GET_TIME
int max_events=128;
static __thread struct list_head handlers;
static __thread fd_set read_fds;
static __thread fd_set write_fds;
static __thread fd_set read_fds0;
static __thread fd_set write_fds0;
static __thread int md_term;
static int epoll_fd;
static struct epoll_event *epoll_events;
asm(".hidden md_init");
asm(".hidden md_run");
asm(".hidden md_terminate");
static pthread_t md_thr;
static void* md_thread(void *arg)
static void _triton_process_events(int wait);
void md_init()
int md_init()
{
INIT_LIST_HEAD(&handlers);
FD_ZERO(&read_fds);
FD_ZERO(&write_fds);
signal(SIGPIPE,SIG_IGN);
#ifdef USE_CORO
triton_coroutine_create(0,md_run,0,1);
#endif
}
void md_run()
{
md_term=0;
epoll_fd=epoll_create(0);
if (epoll_fd<0)
{
perror("epoll_create");
return -1;
}
while(!md_term)
epoll_events=malloc(MAX_EVENTS * sizeof(struct epoll_event));
if (!epoll_events)
{
_triton_process_events(1);
fprintf(stderr,"cann't allocate memory\n");
return -1;
}
}
default_ctx=malloc(sizeof(*default_ctx));
if (!default_ctx)
{
fprintf(stderr,"cann't allocate memory\n");
return -1;
}
#ifdef USE_CORO
asm(".hidden cur_uc");
ucontext_t cur_uc;
#endif
triton_register_ctx(default_ctx);
static void _triton_process_events(int wait)
return 0;
}
void md_run()
{
pthread_create(&md_thr,md_thread,NULL);
}
static void* md_thread(void *arg)
{
int max_fd=0,t;
struct md_handler_t *md_h;
struct triton_md_handler_t *h;
struct timeval tv1,tv2,twait0;
struct list_head *p1,*p2;
int twait,n;
int _break=0;
int timeout,i,n;
gettimeofday(&tv1,NULL);
_break=0;
if (wait)
{
twait=timer_prepare(&tv1);
#ifdef USE_CORO
t=coroutine_get_timeout(&tv1);
#else
t=-1;
#endif
if (t>=0 && (twait==-1 || t<twait)) twait=t;
list_for_each_entry(md_h,&handlers,entry)
{
if (md_h->in_handler) continue;
if (md_h->handler->twait>=0 && (twait==-1 || md_h->handler->twait<twait)) twait=md_h->handler->twait;
}
}else
{
twait=0;
}
read_fds0=read_fds; write_fds0=write_fds;
list_for_each_entry(md_h,&handlers,entry)
{
if (md_h->in_handler)
{
FD_CLR(md_h->fd,&read_fds0);
FD_CLR(md_h->fd,&write_fds0);
}else
{
if (md_h->fd>max_fd) max_fd=md_h->fd;
}
}
twait0=(struct timeval){twait/1000,(twait%1000)*1000};
n=select(max_fd+1,&read_fds0,&write_fds0,NULL,twait>=0?&twait0:NULL);
gettimeofday(&tv2,NULL);
twait=(tv2.tv_sec-tv1.tv_sec)*1000+(tv2.tv_usec-tv1.tv_usec)/1000;
list_for_each_safe(p1,p2,&handlers)
{
md_h=list_entry(p1,struct md_handler_t,entry);
//if (!md_h->del)
{
if (md_h->handler->twait>=0)
{
md_h->handler->twait-=twait;
if (md_h->handler->twait<=0) md_h->timeout=1;
}
}
}
timer_check(&tv2);
gettimeofday(&tv2,NULL);
#ifdef USE_CORO
coroutine_check_timeout(&tv2);
#endif
list_for_each_safe(p1,p2,&handlers)
{
md_h=list_entry(p1,struct md_handler_t,entry);
if (md_h->in_handler) continue;
if (!md_h->del)
{
if (md_h->timeout)
{
md_h->timeout=0;
#ifdef USE_CORO
md_h->in_handler=1;
if (md_h->coro)
{
long int id=(long int)md_h->coro;
md_h->coro=NULL;
triton_coroutine_wakeup(id);
}else
#endif
{
md_h->handler->timeout(md_h->handler);
}
md_h->in_handler=0;
if (_break) return;
}
}
}
if (n<0)
{
perror("triton: md(select)");
//goto check_timeout;
}
if (n>0)
{
list_for_each_safe(p1,p2,&handlers)
{
md_h=list_entry(p1,struct md_handler_t,entry);
if (md_h->in_handler) continue;
if (md_h->del) continue;
md_h->in_handler=1;
if (FD_ISSET(md_h->fd,&read_fds0))
{
if (md_h->handler->read==md_h->handler->write)
FD_CLR(md_h->fd,&write_fds0);
#ifdef USE_CORO
if (md_h->coro)
{
long int id=(long int)md_h->coro;
md_h->coro=NULL;
triton_coroutine_wakeup(id);
}else
#endif
{
md_h->handler->read(md_h->handler);
}
}
if (!md_h->del && FD_ISSET(md_h->fd,&write_fds0) && md_h->handler->write)
{
#ifdef USE_CORO
if (md_h->coro)
{
long int id=(long int)md_h->coro;
md_h->coro=NULL;
triton_coroutine_wakeup(id);
}else
#endif
{
md_h->handler->write(md_h->handler);
}
}
md_h->in_handler=0;
if (_break) return;
}
}
//check_timeout:
for(p1=handlers.next; p1!=&handlers;)
{
md_h=list_entry(p1,struct md_handler_t,entry);
p1=p1->next;
if (md_h->del)
{
list_del(&md_h->entry);
free(md_h);
}
}
if (!wait) _break=1;
}
void triton_process_events(void)
{
_triton_process_events(0);
n=epoll_wait(epoll_fd,epoll_events,MAX_EVENTS,-1);
if (n<0)
{
if (errno!=EINTR)
perror("epoll_wait");
continue;
}
if (n==0)
return;
for(i=0; i<n; i++)
{
h=(struct triton_md_handler_t*)epoll_events[i].data.ptr;
pthread_mutex_lock(&h->ctx->lock);
h->trig_epoll_events=epoll_events[i].events;
list_add_tail(&h->entry2,&h->ctx->pending_handlers);
h->pending=1;
triton_queue_ctx(h->ctx);
pthread_mutex_unlock(&h->ctx->lock);
}
}
void md_terminate()
{
md_term=1;
}
void triton_md_register_handler(struct triton_md_handler_t *h)
{
struct md_handler_t *md_h;
list_for_each_entry(md_h,&handlers,entry)
{
if (md_h->handler==h)
{
if (!md_h->del)
{
printf("triton: bug: double triton_md_register_handler\n");
abort();
}
md_h->del=0;
md_h->in_handler=0;
md_h->coro=0;
md_h->fd=0;
return;
}
}
md_h=(struct md_handler_t *)malloc(sizeof(struct md_handler_t));
memset(md_h,0,sizeof(*md_h));
md_h->handler=h;
list_add_tail(&md_h->entry,&handlers);
h->epoll_event.data.ptr=h;
if (!h->ctx)
h->ctx=default_ctx;
pthread_mutex_lock(&h->ctx->lock);
list_add_tail(&h->entry,&h->ctx->handlers);
pthread_mutex_unlock(&h->ctx->lock);
}
void triton_md_unregister_handler(struct triton_md_handler_t *h)
{
struct md_handler_t *md_h;
list_for_each_entry(md_h,&handlers,entry)
{
if (md_h->handler==h)
{
triton_md_disable_handler(h,0);
/*list_del(&md_h->entry);
free(md_h);
return;*/
md_h->del=1;
return;
}
}
pthread_mutex_lock(&h->ctx->lock);
list_del(&h->entry);
if (h->pending)
list_del(&h->entry2);
pthread_lock_unlock(&h->ctx->lock);
}
void triton_md_enable_handler(struct triton_md_handler_t *h, int mode)
int triton_md_enable_handler(struct triton_md_handler_t *h, int mode)
{
struct md_handler_t *md_h;
int r;
int events=h->epoll_event.events;
list_for_each_entry(md_h,&handlers,entry)
{
if (md_h->handler==h)
{
md_h->fd=h->fd;
break;
}
}
if (mode)
{
if (mode&MD_MODE_READ)
FD_SET(h->fd,&read_fds);
if (mode&MD_MODE_WRITE)
FD_SET(h->fd,&write_fds);
}else
{
FD_SET(h->fd,&read_fds);
FD_SET(h->fd,&write_fds);
}
}
void triton_md_disable_handler(struct triton_md_handler_t *h,int mode)
{
if (mode)
{
if (mode&MD_MODE_READ)
FD_CLR(h->fd,&read_fds);
if (mode&MD_MODE_WRITE)
FD_CLR(h->fd,&write_fds);
}else
{
FD_CLR(h->fd,&read_fds);
FD_CLR(h->fd,&write_fds);
}
if (mode&MD_MODE_READ)
h->epoll_event.events|=EPOLLIN;
if (mode&MD_MODE_WRITE)
h->epoll_event.events|=EPOLLOUT;
h->epoll_event.events|=EPOLLET;
if (events)
r=epoll_ctl(epoll_fd,EPOLL_CTL_MOD,h->fd,&h->epoll_event);
else
r=epoll_ctl(epoll_fd,EPOLL_CTL_ADD,h->fd,&h->epoll_event);
return r;
}
#ifdef USE_CORO
int triton_md_wait(struct triton_md_handler_t *h)
int triton_md_disable_handler(struct triton_md_handler_t *h,int mode)
{
struct md_handler_t *md_h;
int res=0;
list_for_each_entry(md_h,&handlers,entry)
if (h->epoll_events.events)
return -1;
if (mode&MD_MODE_READ)
h->epoll_event.events&=~EPOLLIN;
if (mode&MD_MODE_WRITE)
h->epoll_event.events&=~EPOLLOUT;
if (h->epoll_event.events&(EPOLLIN|EPOLLOUT))
r=epoll_ctl(epoll_fd,EPOLL_CTL_MOD,h->fd,&h->epoll_event);
else
{
if (md_h->handler==h) break;
h->epoll_event.events=0;
r=epoll_ctl(epoll_fd,EPOLL_CTL_DEL,h->fd,NULL);
}
md_h->in_handler=0;
md_h->coro=current_coro;
triton_coroutine_schedule();
if (FD_ISSET(md_h->fd,&read_fds0)) res|=MD_MODE_READ;
if (FD_ISSET(md_h->fd,&write_fds0)) res|=MD_MODE_WRITE;
return res;
}
int triton_md_wait2(int fd,int mode,int timeout)
{
int r;
struct triton_md_handler_t h=
{
.fd=fd,
.twait=timeout,
};
triton_md_register_handler(&h);
triton_md_enable_handler(&h,mode);
r=triton_md_wait(&h);
triton_md_unregister_handler(&h);
return r;
return r;
}
#endif
#include "triton_p.h"
int max_threads=128;
int thread_idletime=60; //seconds
static pthread_mutex_t threads_lock=PTHREAD_MUTEX_INITIALIZER;
static LIST_HEAD(threads);
static int threads_count;
static pthread_mutex_t ctx_queue_lock=PTHREAD_MUTEX_INITIALIZER;
static LIST_HEAD(ctx_queue);
static pthread_mutex_t ctx_list_lock=PTHREAD_MUTEX_INITIALIZER;
static LIST_HEAD(ctx_list);
struct triton_ctx_t *default_ctx;
void triton_thread_wakeup(struct triton_thread_t *thread)
{
pthread_mutex_lock(&h->ctx->thread->lock);
pthread_cont_signal(&h->ctx->thread->cond);
pthread_mutex_unlock(&h->ctx->thread->lock);
}
static void* triton_thread(struct triton_thread_t *thread)
{
struct triton_md_handler_t *h;
struct triton_timer_t *t;
struct timespec abstime;
while(1)
{
abstime.tv_time=time(NULL)+thread_idletime;
abstime.tv_nsec=0;
pthread_mutex_lock(&thread->lock);
if (pthread_cond_timedwait(&thread->cond,&thread->lock,&abstime) && !thread->ctx)
thread->destroing=1;
pthread_mutex_unlock(&thread->lock);
if (thread->terminate)
return NULL;
if (thread->destroing)
{
pthread_mutex_lock(&threads_lock);
list_del(&thread->entry);
--threads_count;
pthread_mutex_unlock(&threads_lock);
free(thread);
return NULL;
}
cont:
if (thread->ctx->close)
{
list_for_each_entry(h,&thread->ctx->handlers,entry)
if (h->close)
h->close(h);
thread->ctx->close=0;
}
while (1)
{
pthread_mutex_lock(&thread->ctx->lock);
if (list_empty(&thread->ctx->pending_timers))
{
pthread_mutex_unlock(&thread->ctx->lock);
break;
}
t=list_entry(thread->ctx->pending_timers.next);
list_del(&t->entry2);
pthread_mutex_unlock(&thread->ctx->lock);
if (t->expire(t))
continue;
}
while (1)
{
pthread_mutex_lock(&thread->ctx->lock);
if (list_empty(&thread->ctx->pending_events))
{
pthread_mutex_unlock(&thread->ctx->lock);
break;
}
h=list_entry(thread->ctx->pending_events.next);
list_del(&h->entry2);
h->pending=0;
pthread_mutex_unlock(&thread->ctx->lock);
if (h->trig_epoll_events&(EPOLLIN|EPOLLERR|EPOLLHUP))
if (h->read)
if (h->read(h))
continue;
if (h->trig_epoll_events&(EPOLLOUT|EPOLLERR|EPOLLHUP))
if (h->write)
if (h->write(h))
continue;
h->trig_epoll_events=0;
/*if (h->twait==0)
if (h->timeout)
if (h->timeout(h))
continue;
if (h->twait>0)
triton_md_set_timeout(h,h->twait);*/
}
pthread_mutex_lock(&thread->ctx->lock);
if (!list_empty(&thread->ctx->pending_events) || !list_empty(&thread->ctx->pending_timers))
{
pthread_mutex_unlock(&thread->ctx->lock);
goto cont;
}
thread->ctx->thread=NULL;
thread->ctx=NULL;
pthread_mutex_unlock(&thread->ctx->lock);
pthread_mutex_lock(&threads_lock);
if (!list_empty(&ctx_queue))
{
thread->ctx=list_entry(ctx_queue.next);
pthread_mutex_lock(&thread->ctx->lock);
list_del(&ctx->entry2);
ctx->thread=thread;
ctx->queue=0;
pthread_mutex_unlock(&thread->ctx->lock);
pthread_mutex_unlock(&threads_lock);
goto cont;
}
list_add(&thread->entry,&threads);
pthread_mutex_unlock(&threads_lock);
}
}
struct triton_thread_t *create_thread()
{
struct triton_thread_t *thread=malloc(sizeof(*thread));
memset(thread,0,sizeof(*thread));
pthread_mutex_init(&thread->lock);
pthread_cond_init(&thread->cond);
pthread_create(&thread->thread,NULL,md_thread,thread);
++threads_count;
return thread;
}
void triton_queue_ctx(struct triton_ctx_t *ctx)
{
if (ctx->thread || ctx->queued)
return;
pthread_mutex_lock(&threads_lock);
if (list_empty(&threads))
{
if (threads_count>=max_threads)
{
list_add_tail(&ctx->entry2,&ctx_queue);
ctx->queued=1;
pthread_mutex_unlock(&threads_lock);
return;
}
ctx->thread=create_thread();
}else
{
ctx->thread=list_entry(threads.next);
pthread_mutex_lock(&ctx->thread->lock);
if (ctx->thread->destroing)
{
pthread_mutex_unlock(&ctx->thread->lock);
ctx->thread=create_thread();
}else
{
list_del(&ctx->thread->entry);
pthread_mutex_unlock(&ctx->thread->lock);
}
}
pthread_mutex_unlock(&threads_lock);
triton_thread_wakeup(ctx->thread);
}
void triton_register_ctx(struct triton_ctx_t *ctx)
{
pthread_mutex_init(&ctx->lock);
INIT_LIST_HEAD(&ctx->handlers);
INIT_LIST_HEAD(&ctx->timers);
INIT_LIST_HEAD(&ctx->pending_handlers);
INIT_LIST_HEAD(&ctx->pending_timers);
pthread_mutex_lock(&ctx_list_lock);
list_add_tail(&ctx->entry,&ctx_list);
pthread_mutex_unlock(&ctx_list_lock);
}
void triton_unregister_ctx(struct triton_ctx_t *ctx)
{
pthread_mutex_lock(&ctx_list_lock);
list_add_tail(&ctx->entry,&ctx_list);
pthread_mutex_unlock(&ctx_list_lock);
}
void triton_init()
{
md_init();
timer_init();
}
void triton_run()
{
md_run();
timer_run();
}
void triton_terminate()
{
struct triton_ctx_t *ctx;
pthread_mutex_lock(&ctx_list_lock);
list_for_each_entry(ctx,&ctx_list,entry)
{
pthread_mutex_lock(&ctx->lock);
ctx->close=1;
triton_queue_ctx(ctx);
pthread_mutex_unlock(&ctx->lock);
}
pthread_mutex_unlock(&ctx_list_lock);
timer_terminate();
md_terminate();
}
......@@ -2,16 +2,67 @@
#define TRITON_H
#include <sys/time.h>
#include <pthread.h>
#include <sys/epoll.h>
struct triton_thread_t
{
struct list_head entry;
pthread_mutex_t lock;
pthread_cond_t cond;
pthread_t thread;
int terminate:1;
int destroing:1;
struct timeval tv;
struct triton_ctx_t *ctx;
};
struct triton_ctx_t
{
struct list_head entry;
struct list_head entry2;
pthread_mutex_t lock;
struct list_head handlers;
struct list_head timers;
triton_thread_t *thread;
struct list_head pending_handlers;
struct list_head pending_timers;
int queued:1;
int close:1;
};
struct triton_md_handler_t
{
//triton part
//==========
struct list_head entry;
struct list_head entry2;
struct triton_ctx_t *ctx;
struct epoll_event epoll_event;
uint32_t trig_epoll_event;
int pending:1;
//=========
//user part
//=========
int fd;
int twait;
void *pd;
void (*read)(struct triton_md_handler_t *h);
void (*write)(struct triton_md_handler_t *h);
void (*timeout)(struct triton_md_handler_t *h);
int (*read)(struct triton_md_handler_t *);
int (*write)(struct triton_md_handler_t *);
void (*close)(struct triton_md_handler_t *);
//=========
};
struct triton_timer_t
{
struct list_head entry;
int active;
int pending:1;
struct timeval expire_tv;
int period;
int (*expire)(struct triton_timer_t *);
};
#define MD_MODE_READ 1
......@@ -20,18 +71,7 @@ void triton_md_register_handler(struct triton_md_handler_t *h);
void triton_md_unregister_handler(struct triton_md_handler_t *h);
void triton_md_enable_handler(struct triton_md_handler_t *h, int mode);
void triton_md_disable_handler(struct triton_md_handler_t *h,int mode);
int triton_md_wait(struct triton_md_handler_t *h);
int triton_md_wait2(int fd,int mode,int timeout);
struct triton_timer_t
{
struct timeval expire_tv;
int period;
void *pd;
int active;
int (*expire)(struct triton_timer_t*);
};
void triton_md_set_timeout(struct triton_md_handler_t *h, int msec);
void triton_timer_add(struct triton_timer_t*);
void triton_timer_del(struct triton_timer_t*);
......@@ -41,11 +81,6 @@ void triton_timer_single_shot1(int twait,triton_ss_func,int argc,...);
void triton_timer_single_shot2(struct timeval *shot_tv,triton_ss_func,int argc,...);
void triton_timer_single_shot3(int tv_sec,int tv_usec,triton_ss_func,int argc,...);
typedef void (*triton_event_func)(void);
void triton_event_register_handler(int ev_id,triton_event_func,int argc,...);
void triton_event_unregister_handler(int ev_id,triton_event_func);
void triton_event_fire(int ev_id,int argc,...);
int triton_get_int_option(const char *str);
const char* triton_get_str_option(const char *str);
double triton_get_double_option(const char *str);
......@@ -53,16 +88,6 @@ double triton_get_double_option(const char *str);
void triton_terminate(void);
void triton_process_events(void);
#ifdef USE_CORO
#define DEF_COROUTINE_STACK 64*1024
typedef void (*triton_coroutine_func)(void*);
long int triton_coroutine_create(int stack_size,triton_coroutine_func func,void *arg,int run);
void triton_coroutine_delete(long int id);
void triton_coroutine_wakeup(long int id);
void triton_coroutine_schedule();
int triton_coroutine_schedule_timeout(int msec);
#endif
#define TRITON_OK 0
#define TRITON_ERR_NOCOMP -1
#define TRITON_ERR_NOSUPP -2
......
......@@ -5,7 +5,6 @@
#include "list.h"
#include <stdarg.h>
#include <ucontext.h>
#define MAX_ARGS 32
......@@ -17,20 +16,6 @@ struct option_t
char *val;
};
struct md_handler_t
{
struct list_head entry;
int fd;
int del;
int timeout;
int volatile in_handler;
struct coroutine_t *coro;
struct triton_md_handler_t *handler;
};
struct timer_t
{
struct list_head entry;
......@@ -48,93 +33,10 @@ struct timer_single_shot_t
triton_ss_func ss_func;
};
struct event_handler_t
{
struct list_head entry;
int arg_cnt;
void *args;
triton_event_func event_func;
};
struct event_t
{
struct list_head entry;
int ev_id;
struct list_head handlers;
};
struct coroutine_t
{
struct list_head entry;
ucontext_t uc;
struct timeval timeout;
struct timeval time;
};
extern struct list_head components;
extern void md_run();
extern void md_terminate();
extern int timer_prepare(struct timeval *tv);
extern void timer_check(struct timeval *tv);
extern int coroutine_get_timeout(struct timeval *tv);
extern void coroutine_check_timeout(struct timeval *tv);
extern void event_init();
extern struct coroutine_t *current_coro;
void schedule(void);
//#define BROKEN_GCC
#ifdef BROKEN_GCC
#define dyn_call(func,arg_cnt,args)\
{\
switch(arg_cnt)\
{\
case 0: \
{\
typedef void (*func0)(void);\
((func0)func)();\
break;\
}\
case 1: \
{\
typedef void (*func0)(long);\
((func0)func)(*((long*)args+0));\
break;\
}\
case 2: \
{\
typedef void (*func0)(long,long);\
((func0)func)(*((long*)args+0),*((long*)args+1));\
break;\
}\
case 3: \
{\
typedef void (*func0)(long,long,long);\
((func0)func)(*((long*)args+0),*((long*)args+1),*((long*)args+2));\
break;\
}\
case 4: \
{\
typedef void (*func0)(long,long,long,long);\
((func0)func)(*((long*)args+0),*((long*)args+1),*((long*)args+2),*((long*)args+3));\
break;\
}\
}\
}
#else
#define dyn_call(func,arg_cnt,args)\
{\
int aaa=arg_cnt*sizeof(long);\
asm("subl %2,%%esp; \n\
movl %%esp,%%edi;\n\
movl %0,%%esi;\n\
cld;\n\
rep movsl;\n\
call *%1;\n\
addl %2,%%esp\n"::"m" (args),"m" (func),"g" (aaa),"c"(arg_cnt):"%edi","%esi","%esp");\
}
#endif
extern void timer_run();
extern void timer_terminate();
extern struct triton_ctx_t *default_ctx;
#endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册