提交 1e7037e8 编写于 作者: F freemine

epoll and kqueue

上级 499644d5
......@@ -39,6 +39,7 @@ struct ep_over_kq_s {
int sv[2]; // 0 for read, 1 for write
int evs_count;
eok_event_t *evs_head;
eok_event_t *evs_tail;
eok_event_t *evs_free;
......@@ -82,6 +83,139 @@ static eoks_t eoks = {
.eoks_free = NULL,
};
static const char* op_str(int op) {
switch (op) {
case EPOLL_CTL_ADD: return "EPOLL_CTL_ADD";
case EPOLL_CTL_MOD: return "EPOLL_CTL_MOD";
case EPOLL_CTL_DEL: return "EPOLL_CTL_DEL";
default: return "UNKNOWN";
}
}
static __thread char buf_slots[10][1024] = {0};
static __thread int buf_slots_linelen = sizeof(buf_slots[0])/sizeof(buf_slots[0][0]);
static __thread int buf_slots_count = sizeof(buf_slots)/(sizeof(buf_slots[0])/sizeof(buf_slots[0][0]));
static const char* events_str(uint32_t events, int slots) {
A(slots>=0 && slots<buf_slots_count, "internal logic error");
char *buf = buf_slots[slots];
char *p = buf;
size_t len = buf_slots_linelen;
int n = 0;
buf[0] = '\0';
// EPOLLIN = 0x001,
// #define EPOLLIN EPOLLIN
// EPOLLPRI = 0x002,
// #define EPOLLPRI EPOLLPRI
// EPOLLOUT = 0x004,
// #define EPOLLOUT EPOLLOUT
// EPOLLRDNORM = 0x040,
// #define EPOLLRDNORM EPOLLRDNORM
// EPOLLRDBAND = 0x080,
// #define EPOLLRDBAND EPOLLRDBAND
// EPOLLWRNORM = 0x100,
// #define EPOLLWRNORM EPOLLWRNORM
// EPOLLWRBAND = 0x200,
// #define EPOLLWRBAND EPOLLWRBAND
// EPOLLMSG = 0x400,
// #define EPOLLMSG EPOLLMSG
// EPOLLERR = 0x008,
// #define EPOLLERR EPOLLERR
// EPOLLHUP = 0x010,
// #define EPOLLHUP EPOLLHUP
// EPOLLRDHUP = 0x2000,
// #define EPOLLRDHUP EPOLLRDHUP
// EPOLLEXCLUSIVE = 1u << 28,
// #define EPOLLEXCLUSIVE EPOLLEXCLUSIVE
// EPOLLWAKEUP = 1u << 29,
// #define EPOLLWAKEUP EPOLLWAKEUP
// EPOLLONESHOT = 1u << 30,
// #define EPOLLONESHOT EPOLLONESHOT
// EPOLLET = 1u << 31
// #define EPOLLET EPOLLET
#define CHK_EV(ev) \
if (len>0 && (events & (ev))==(ev)) { \
n = snprintf(p, len, "%s%s", p!=buf ? "|" : "", #ev); \
p += n; \
len -= n; \
}
CHK_EV(EPOLLIN);
CHK_EV(EPOLLPRI);
CHK_EV(EPOLLOUT);
CHK_EV(EPOLLRDNORM);
CHK_EV(EPOLLRDBAND);
CHK_EV(EPOLLWRNORM);
CHK_EV(EPOLLWRBAND);
CHK_EV(EPOLLMSG);
CHK_EV(EPOLLERR);
CHK_EV(EPOLLHUP);
CHK_EV(EPOLLRDHUP);
CHK_EV(EPOLLEXCLUSIVE);
CHK_EV(EPOLLWAKEUP);
CHK_EV(EPOLLONESHOT);
CHK_EV(EPOLLET);
#undef CHK_EV
return buf;
}
static const char* kev_flags_str(uint16_t flags, int slots) {
A(slots>=0 && slots<buf_slots_count, "internal logic error");
char *buf = buf_slots[slots];
char *p = buf;
size_t len = buf_slots_linelen;
int n = 0;
buf[0] = '\0';
// #define EV_ADD 0x0001 /* add event to kq (implies enable) */
// #define EV_DELETE 0x0002 /* delete event from kq */
// #define EV_ENABLE 0x0004 /* enable event */
// #define EV_DISABLE 0x0008 /* disable event (not reported) */
// /* flags */
// #define EV_ONESHOT 0x0010 /* only report one occurrence */
// #define EV_CLEAR 0x0020 /* clear event state after reporting */
// #define EV_RECEIPT 0x0040 /* force immediate event output */
// /* ... with or without EV_ERROR */
// /* ... use KEVENT_FLAG_ERROR_EVENTS */
// /* on syscalls supporting flags */
// #define EV_DISPATCH 0x0080 /* disable event after reporting */
// #define EV_UDATA_SPECIFIC 0x0100 /* unique kevent per udata value */
// #define EV_DISPATCH2 (EV_DISPATCH | EV_UDATA_SPECIFIC)
// /* ... in combination with EV_DELETE */
// /* will defer delete until udata-specific */
// /* event enabled. EINPROGRESS will be */
// /* returned to indicate the deferral */
// #define EV_VANISHED 0x0200 /* report that source has vanished */
// /* ... only valid with EV_DISPATCH2 */
// #define EV_SYSFLAGS 0xF000 /* reserved by system */
// #define EV_FLAG0 0x1000 /* filter-specific flag */
// #define EV_FLAG1 0x2000 /* filter-specific flag */
// /* returned values */
// #define EV_EOF 0x8000 /* EOF detected */
// #define EV_ERROR 0x4000 /* error, data contains errno */
#define CHK_EV(ev) \
if (len>0 && (flags & (ev))==(ev)) { \
n = snprintf(p, len, "%s%s", p!=buf ? "|" : "", #ev); \
p += n; \
len -= n; \
}
CHK_EV(EV_ADD);
CHK_EV(EV_DELETE);
CHK_EV(EV_ENABLE);
CHK_EV(EV_DISABLE);
CHK_EV(EV_ONESHOT);
CHK_EV(EV_CLEAR);
CHK_EV(EV_RECEIPT);
CHK_EV(EV_DISPATCH);
CHK_EV(EV_UDATA_SPECIFIC);
CHK_EV(EV_DISPATCH2);
CHK_EV(EV_VANISHED);
CHK_EV(EV_SYSFLAGS);
CHK_EV(EV_FLAG0);
CHK_EV(EV_FLAG1);
CHK_EV(EV_EOF);
CHK_EV(EV_ERROR);
#undef CHK_EV
return buf;
}
static ep_over_kq_t* eoks_alloc(void);
static void eoks_free(ep_over_kq_t *eok);
static ep_over_kq_t* eoks_find(int epfd);
......@@ -135,10 +269,15 @@ int epoll_create(int size) {
return -1;
}
D("epoll_create epfd:[%d]", eok->idx);
return eok->idx;
}
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) {
D("epoll_ctling epfd:[%d], op:[%s], fd:[%d], events:[%04x:%s]",
epfd, op_str(op), fd,
event ? event->events : 0,
event ? events_str(event->events, 0) : "NULL");
int e = 0;
if (epfd<0 || epfd>=eoks.neoks) {
errno = EBADF;
......@@ -198,6 +337,7 @@ int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) {
ev.changed = 2;
} break;
case EPOLL_CTL_DEL: {
D("epoll_ctl adding...");
// event is ignored
pev = &oev->epev;
flags = EV_DELETE;
......@@ -213,11 +353,9 @@ int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) {
if (pev->events & (EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP)) {
flags |= EV_EOF;
EV_SET64(&krev, ev.fd, EVFILT_READ, flags, 0, 0, -1, 0, 0);
D("....");
}
if (pev->events & EPOLLOUT) {
EV_SET64(&kwev, ev.fd, EVFILT_WRITE, flags, 0, 0, -1, 0, 0);
D("....");
}
if (eok_chgs_refresh(eok, oev, &ev, &krev, &kwev)) {
......@@ -244,14 +382,17 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
int e = 0;
if (epfd<0 || epfd>=eoks.neoks) {
errno = EBADF;
E("epoll_waiting epfd:[%d], maxevents:[%d], timeout:[%d] failed", epfd, maxevents, timeout);
return -1;
}
if (!events) {
errno = EINVAL;
E("epoll_waiting epfd:[%d], maxevents:[%d], timeout:[%d] failed", epfd, maxevents, timeout);
return -1;
}
if (maxevents<=0) {
errno = EINVAL;
E("epoll_waiting epfd:[%d], maxevents:[%d], timeout:[%d] failed", epfd, maxevents, timeout);
return -1;
}
......@@ -260,6 +401,7 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
ep_over_kq_t *eok = eoks_find(epfd);
if (!eok) {
errno = EBADF;
E("epoll_waiting epfd:[%d], maxevents:[%d], timeout:[%d] failed", epfd, maxevents, timeout);
return -1;
}
......@@ -281,6 +423,7 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
struct kevent64_s *eventslist = eok_alloc_eventslist(eok, maxevents);
if (!eventslist) {
e = ENOMEM;
E("epoll_waiting epfd:[%d], maxevents:[%d], timeout:[%d] failed", epfd, maxevents, timeout);
break;
}
memset(eventslist, 0, maxevents * sizeof(*eventslist));
......@@ -305,8 +448,15 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
eok->ichanges = 0;
A(0==pthread_mutex_unlock(&eok->lock), "");
if (ichanges>0) {
D("kevent64 changing [%d] changes and waiting...", ichanges);
}
errno = 0;
r = kevent64(eok->kq, kchanges, ichanges, eventslist, maxevents, 0, pto);
e = errno;
if (e) {
E("kevent64 waiting done, with r[%d]", r);
}
A(0==pthread_mutex_lock(&eok->lock), "");
eok->waiting = 0;
......@@ -318,7 +468,6 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
}
eok->waiting = 0;
if (r<0) break;
if (r==0) {
A(timeout!=-1, "internal logic error");
}
......@@ -327,7 +476,10 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
A(kev->udata && eok->evs_head && eok->evs_tail, "internal logic error");
eok_event_t *ev = (eok_event_t*)kev->udata;
A(kev->ident == ev->fd, "internal logic error");
D("...");
if (kev->flags & EV_ERROR) {
D("error when processing change list for fd[%d], error[%s], kev_flags:[%04x:%s]",
ev->fd, strerror(kev->data), kev->flags, kev_flags_str(kev->flags, 0));
}
switch (kev->filter) {
case EVFILT_READ: {
A((ev->epev.events & EPOLLIN), "internal logic errro");
......@@ -335,10 +487,10 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
char c = '\0';
A(1==recv(kev->ident, &c, 1, 0), "internal logic error");
A(0==memcmp(&c, "1", 1), "internal logic error");
D("...............");
D("wokenup");
} else {
if (ev->changed==3) {
D("already requested to delete");
D("already requested to delete for fd[%d]", ev->fd);
// EV_DELETE?
continue;
}
......@@ -347,8 +499,13 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
pev.events = EPOLLIN;
if (kev->flags & EV_EOF) {
pev.events |= (EPOLLHUP | EPOLLERR | EPOLLRDHUP);
D("..........");
}
pev.events &= ev->epev.events;
pev.events = pev.events & ev->epev.events;
D("events found for fd[%d]: [%04x:%s], which was registered: [%04x:%s], kev_flags: [%04x:%s]",
ev->fd, pev.events, events_str(pev.events, 0),
ev->epev.events, events_str(ev->epev.events, 1),
kev->flags, kev_flags_str(kev->flags, 2));
events[cnts++] = pev;
}
} break;
......@@ -360,11 +517,26 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
} break;
}
}
if (r>=0) {
eok_event_t *p = eok->evs_head;
while (p) {
eok_event_t *next = p->next;
if (p->changed==3) {
D("removing registered event for fd[%d]: [%04x:%s]", p->fd, p->epev.events, events_str(p->epev.events, 0));
eok_free_ev(eok, p);
}
p = next;
}
}
} while (cnts==0);
if (cnts>0) {
D("kevent64 waiting done with [%d] events", cnts);
}
A(0==pthread_mutex_unlock(&eok->lock), "");
if (e) {
errno = e;
E("epoll_wait failed");
return -1;
}
......@@ -383,6 +555,7 @@ static struct timespec do_timespec_diff(struct timespec *from, struct timespec *
}
int epoll_close(int epfd) {
D("epoll_closing epfd: [%d]", epfd);
if (epfd<0 || epfd>=eoks.neoks) {
errno = EBADF;
return -1;
......@@ -445,6 +618,8 @@ static eok_event_t* eok_calloc_ev(ep_over_kq_t *eok) {
else eok->evs_head = p;
eok->evs_tail = p;
eok->evs_count += 1;
return p;
}
......@@ -456,6 +631,8 @@ static void eok_free_ev(ep_over_kq_t *eok, eok_event_t *ev) {
else eok->evs_tail = ev->prev;
ev->next = eok->evs_free;
eok->evs_free = ev->next;
eok->evs_count -= 1;
}
static void eok_wakeup(ep_over_kq_t *eok) {
......@@ -486,17 +663,23 @@ static int eok_chgs_refresh(ep_over_kq_t *eok, eok_event_t *oev, eok_event_t *ev
}
oev->fd = ev->fd;
oev->epev = ev->epev;
if (ev->changed!=3) {
oev->epev = ev->epev;
}
oev->changed = ev->changed;
n = 0;
if (krev->ident==ev->fd) {
krev->udata = (uint64_t)oev;
eok->kchanges[eok->ichanges++] = *krev;
++n;
}
if (kwev->ident==ev->fd) {
kwev->udata = (uint64_t)oev;
eok->kchanges[eok->ichanges++] = *kwev;
++n;
}
D("add changes[%d] for fd[%d], and changes/registers [%d/%d]", n, ev->fd, eok->ichanges, eok->evs_count);
return 0;
}
......
......@@ -54,18 +54,74 @@ static ep_t* ep_create(void);
static void ep_destroy(ep_t *ep);
static void* routine(void* arg);
static int open_connect(unsigned short port);
static int open_listen(unsigned short port);
typedef struct client_s client_t;
struct client_s {
int skt;
void (*on_event)(ep_t *ep, struct epoll_event *events, client_t *client);
volatile unsigned int state; // 1: listenning; 2: connected
};
static void echo_event(ep_t *ep, struct epoll_event *ev, client_t *client);
int main(int argc, char *argv[]) {
ep_t* ep = ep_create();
A(ep, "failed");
int skt = open_connect(6789);
if (skt!=-1) {
struct epoll_event ev = {0};
ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
ev.data.ptr = &skt;
A(0==epoll_ctl(ep->ep, EPOLL_CTL_ADD, skt, &ev), "");
client_t *client = (client_t*)calloc(1, sizeof(*client));
if (client) {
client->skt = skt;
client->on_event = echo_event;
client->state = 2;
struct epoll_event ev = {0};
ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
ev.data.ptr = client;
A(0==epoll_ctl(ep->ep, EPOLL_CTL_ADD, skt, &ev), "");
}
}
skt = open_listen(0);
if (skt!=-1) {
client_t *client = (client_t*)calloc(1, sizeof(*client));
if (client) {
client->skt = skt;
client->on_event = echo_event;
client->state = 1;
struct epoll_event ev = {0};
ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
ev.data.ptr = client;
A(0==epoll_ctl(ep->ep, EPOLL_CTL_ADD, skt, &ev), "");
}
}
// char c = '\0';
// while ((c=getchar())!=EOF) {
// switch (c) {
// case 'q': break;
// default: continue;
// }
// }
// getchar();
char *line = NULL;
size_t linecap = 0;
ssize_t linelen;
while ((linelen = getline(&line, &linecap, stdin)) > 0) {
line[strlen(line)-1] = '\0';
if (0==strcmp(line, "exit")) break;
if (0==strcmp(line, "quit")) break;
if (line==strstr(line, "close")) {
int fd = 0;
sscanf(line, "close %d", &fd);
if (fd<=2) {
fprintf(stderr, "fd [%d] invalid\n", fd);
continue;
}
A(0==epoll_ctl(ep->ep, EPOLL_CTL_DEL, fd, NULL), "");
continue;
}
if (strlen(line)==0) continue;
fprintf(stderr, "unknown cmd:[%s]\n", line);
}
getchar();
ep_destroy(ep);
D("");
return 0;
......@@ -114,7 +170,7 @@ static void* routine(void* arg) {
A(0==pthread_mutex_unlock(&ep->lock), "");
int r = epoll_wait(ep->ep, evs, sizeof(evs)/sizeof(evs[0]), -1);
A(r>0, "indefinite epoll_wait shall not timeout");
A(r>0, "indefinite epoll_wait shall not timeout:[%d]", r);
A(0==pthread_mutex_lock(&ep->lock), "");
A(ep->waiting==1, "internal logic error");
......@@ -133,23 +189,50 @@ static void* routine(void* arg) {
continue;
}
A(ev->data.ptr, "internal logic error");
int skt = *(int*)ev->data.ptr;
if (ev->events & EPOLLIN) {
char buf[4];
int n = recv(skt, buf, sizeof(buf)-1, 0);
A(n>=0 && n<sizeof(buf), "internal logic error");
buf[n] = '\0';
fprintf(stderr, "events[%x]:%s\n", ev->events, buf);
}
if (ev->events & EPOLLRDHUP) {
A(0==epoll_ctl(ep->ep, EPOLL_CTL_DEL, skt, NULL), "");
}
client_t *client = (client_t*)ev->data.ptr;
client->on_event(ep, ev, client);
continue;
}
}
return NULL;
}
static int open_listen(unsigned short port) {
int r = 0;
int skt = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (skt==-1) {
E("socket() failed");
return -1;
}
do {
struct sockaddr_in si = {0};
si.sin_family = AF_INET;
si.sin_addr.s_addr = inet_addr("127.0.0.1");
si.sin_port = htons(port);
r = bind(skt, (struct sockaddr*)&si, sizeof(si));
if (r) {
E("bind(%u) failed", port);
break;
}
r = listen(skt, 100);
if (r) {
E("listen() failed");
break;
}
memset(&si, 0, sizeof(si));
socklen_t len = sizeof(si);
r = getsockname(skt, (struct sockaddr *)&si, &len);
if (r) {
E("getsockname() failed");
}
A(len==sizeof(si), "internal logic error");
D("listenning at: %d", ntohs(si.sin_port));
return skt;
} while (0);
close(skt);
return -1;
}
static int open_connect(unsigned short port) {
int r = 0;
int skt = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
......@@ -181,3 +264,41 @@ static int open_connect(unsigned short port) {
return -1;
}
static void echo_event(ep_t *ep, struct epoll_event *ev, client_t *client) {
if (ev->events & EPOLLIN) {
if (client->state==1) {
struct sockaddr_in si = {0};
socklen_t silen = sizeof(si);
int skt = accept(client->skt, (struct sockaddr*)&si, &silen);
if (skt!=-1) {
client_t *server = (client_t*)calloc(1, sizeof(*server));
if (server) {
server->skt = skt;
server->on_event = echo_event;
server->state = 2;
struct epoll_event ev = {0};
ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP;
ev.data.ptr = server;
A(0==epoll_ctl(ep->ep, EPOLL_CTL_ADD, skt, &ev), "");
}
}
}
if (client->state==2) {
char buf[4];
int n = recv(client->skt, buf, sizeof(buf)-1, 0);
A(n>=0 && n<sizeof(buf), "internal logic error:[%d]", n);
buf[n] = '\0';
fprintf(stderr, "events[%x]:%s\n", ev->events, buf);
}
}
if (ev->events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) {
A(0==pthread_mutex_lock(&ep->lock), "");
A(0==epoll_ctl(ep->ep, EPOLL_CTL_DEL, client->skt, NULL), "");
A(0==pthread_mutex_unlock(&ep->lock), "");
close(client->skt);
client->skt = -1;
client->on_event = NULL;
free(client);
}
}
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册