diff --git a/src/os/src/darwin/eok.c b/src/os/src/darwin/eok.c index ea67223c25e30da417e36f2349c86a0ce0c81ccb..f83846b73436987d7848a836ee0b153a5dc9393d 100644 --- a/src/os/src/darwin/eok.c +++ b/src/os/src/darwin/eok.c @@ -39,6 +39,7 @@ struct ep_over_kq_s { int sv[2]; // 0 for read, 1 for write + int evs_count; eok_event_t *evs_head; eok_event_t *evs_tail; eok_event_t *evs_free; @@ -82,6 +83,139 @@ static eoks_t eoks = { .eoks_free = NULL, }; +static const char* op_str(int op) { + switch (op) { + case EPOLL_CTL_ADD: return "EPOLL_CTL_ADD"; + case EPOLL_CTL_MOD: return "EPOLL_CTL_MOD"; + case EPOLL_CTL_DEL: return "EPOLL_CTL_DEL"; + default: return "UNKNOWN"; + } +} + +static __thread char buf_slots[10][1024] = {0}; +static __thread int buf_slots_linelen = sizeof(buf_slots[0])/sizeof(buf_slots[0][0]); +static __thread int buf_slots_count = sizeof(buf_slots)/(sizeof(buf_slots[0])/sizeof(buf_slots[0][0])); +static const char* events_str(uint32_t events, int slots) { + A(slots>=0 && slots0 && (events & (ev))==(ev)) { \ + n = snprintf(p, len, "%s%s", p!=buf ? "|" : "", #ev); \ + p += n; \ + len -= n; \ + } + CHK_EV(EPOLLIN); + CHK_EV(EPOLLPRI); + CHK_EV(EPOLLOUT); + CHK_EV(EPOLLRDNORM); + CHK_EV(EPOLLRDBAND); + CHK_EV(EPOLLWRNORM); + CHK_EV(EPOLLWRBAND); + CHK_EV(EPOLLMSG); + CHK_EV(EPOLLERR); + CHK_EV(EPOLLHUP); + CHK_EV(EPOLLRDHUP); + CHK_EV(EPOLLEXCLUSIVE); + CHK_EV(EPOLLWAKEUP); + CHK_EV(EPOLLONESHOT); + CHK_EV(EPOLLET); +#undef CHK_EV + return buf; +} + +static const char* kev_flags_str(uint16_t flags, int slots) { + A(slots>=0 && slots0 && (flags & (ev))==(ev)) { \ + n = snprintf(p, len, "%s%s", p!=buf ? "|" : "", #ev); \ + p += n; \ + len -= n; \ + } + CHK_EV(EV_ADD); + CHK_EV(EV_DELETE); + CHK_EV(EV_ENABLE); + CHK_EV(EV_DISABLE); + CHK_EV(EV_ONESHOT); + CHK_EV(EV_CLEAR); + CHK_EV(EV_RECEIPT); + CHK_EV(EV_DISPATCH); + CHK_EV(EV_UDATA_SPECIFIC); + CHK_EV(EV_DISPATCH2); + CHK_EV(EV_VANISHED); + CHK_EV(EV_SYSFLAGS); + CHK_EV(EV_FLAG0); + CHK_EV(EV_FLAG1); + CHK_EV(EV_EOF); + CHK_EV(EV_ERROR); +#undef CHK_EV + return buf; +} + static ep_over_kq_t* eoks_alloc(void); static void eoks_free(ep_over_kq_t *eok); static ep_over_kq_t* eoks_find(int epfd); @@ -135,10 +269,15 @@ int epoll_create(int size) { return -1; } + D("epoll_create epfd:[%d]", eok->idx); return eok->idx; } int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) { + D("epoll_ctling epfd:[%d], op:[%s], fd:[%d], events:[%04x:%s]", + epfd, op_str(op), fd, + event ? event->events : 0, + event ? events_str(event->events, 0) : "NULL"); int e = 0; if (epfd<0 || epfd>=eoks.neoks) { errno = EBADF; @@ -198,6 +337,7 @@ int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) { ev.changed = 2; } break; case EPOLL_CTL_DEL: { + D("epoll_ctl adding..."); // event is ignored pev = &oev->epev; flags = EV_DELETE; @@ -213,11 +353,9 @@ int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) { if (pev->events & (EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { flags |= EV_EOF; EV_SET64(&krev, ev.fd, EVFILT_READ, flags, 0, 0, -1, 0, 0); - D("...."); } if (pev->events & EPOLLOUT) { EV_SET64(&kwev, ev.fd, EVFILT_WRITE, flags, 0, 0, -1, 0, 0); - D("...."); } if (eok_chgs_refresh(eok, oev, &ev, &krev, &kwev)) { @@ -244,14 +382,17 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) int e = 0; if (epfd<0 || epfd>=eoks.neoks) { errno = EBADF; + E("epoll_waiting epfd:[%d], maxevents:[%d], timeout:[%d] failed", epfd, maxevents, timeout); return -1; } if (!events) { errno = EINVAL; + E("epoll_waiting epfd:[%d], maxevents:[%d], timeout:[%d] failed", epfd, maxevents, timeout); return -1; } if (maxevents<=0) { errno = EINVAL; + E("epoll_waiting epfd:[%d], maxevents:[%d], timeout:[%d] failed", epfd, maxevents, timeout); return -1; } @@ -260,6 +401,7 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) ep_over_kq_t *eok = eoks_find(epfd); if (!eok) { errno = EBADF; + E("epoll_waiting epfd:[%d], maxevents:[%d], timeout:[%d] failed", epfd, maxevents, timeout); return -1; } @@ -281,6 +423,7 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) struct kevent64_s *eventslist = eok_alloc_eventslist(eok, maxevents); if (!eventslist) { e = ENOMEM; + E("epoll_waiting epfd:[%d], maxevents:[%d], timeout:[%d] failed", epfd, maxevents, timeout); break; } memset(eventslist, 0, maxevents * sizeof(*eventslist)); @@ -305,8 +448,15 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) eok->ichanges = 0; A(0==pthread_mutex_unlock(&eok->lock), ""); + if (ichanges>0) { + D("kevent64 changing [%d] changes and waiting...", ichanges); + } + errno = 0; r = kevent64(eok->kq, kchanges, ichanges, eventslist, maxevents, 0, pto); e = errno; + if (e) { + E("kevent64 waiting done, with r[%d]", r); + } A(0==pthread_mutex_lock(&eok->lock), ""); eok->waiting = 0; @@ -318,7 +468,6 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) } eok->waiting = 0; - if (r<0) break; if (r==0) { A(timeout!=-1, "internal logic error"); } @@ -327,7 +476,10 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) A(kev->udata && eok->evs_head && eok->evs_tail, "internal logic error"); eok_event_t *ev = (eok_event_t*)kev->udata; A(kev->ident == ev->fd, "internal logic error"); - D("..."); + if (kev->flags & EV_ERROR) { + D("error when processing change list for fd[%d], error[%s], kev_flags:[%04x:%s]", + ev->fd, strerror(kev->data), kev->flags, kev_flags_str(kev->flags, 0)); + } switch (kev->filter) { case EVFILT_READ: { A((ev->epev.events & EPOLLIN), "internal logic errro"); @@ -335,10 +487,10 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) char c = '\0'; A(1==recv(kev->ident, &c, 1, 0), "internal logic error"); A(0==memcmp(&c, "1", 1), "internal logic error"); - D("..............."); + D("wokenup"); } else { if (ev->changed==3) { - D("already requested to delete"); + D("already requested to delete for fd[%d]", ev->fd); // EV_DELETE? continue; } @@ -347,8 +499,13 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) pev.events = EPOLLIN; if (kev->flags & EV_EOF) { pev.events |= (EPOLLHUP | EPOLLERR | EPOLLRDHUP); + D(".........."); } - pev.events &= ev->epev.events; + pev.events = pev.events & ev->epev.events; + D("events found for fd[%d]: [%04x:%s], which was registered: [%04x:%s], kev_flags: [%04x:%s]", + ev->fd, pev.events, events_str(pev.events, 0), + ev->epev.events, events_str(ev->epev.events, 1), + kev->flags, kev_flags_str(kev->flags, 2)); events[cnts++] = pev; } } break; @@ -360,11 +517,26 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) } break; } } + if (r>=0) { + eok_event_t *p = eok->evs_head; + while (p) { + eok_event_t *next = p->next; + if (p->changed==3) { + D("removing registered event for fd[%d]: [%04x:%s]", p->fd, p->epev.events, events_str(p->epev.events, 0)); + eok_free_ev(eok, p); + } + p = next; + } + } } while (cnts==0); + if (cnts>0) { + D("kevent64 waiting done with [%d] events", cnts); + } A(0==pthread_mutex_unlock(&eok->lock), ""); if (e) { errno = e; + E("epoll_wait failed"); return -1; } @@ -383,6 +555,7 @@ static struct timespec do_timespec_diff(struct timespec *from, struct timespec * } int epoll_close(int epfd) { + D("epoll_closing epfd: [%d]", epfd); if (epfd<0 || epfd>=eoks.neoks) { errno = EBADF; return -1; @@ -445,6 +618,8 @@ static eok_event_t* eok_calloc_ev(ep_over_kq_t *eok) { else eok->evs_head = p; eok->evs_tail = p; + eok->evs_count += 1; + return p; } @@ -456,6 +631,8 @@ static void eok_free_ev(ep_over_kq_t *eok, eok_event_t *ev) { else eok->evs_tail = ev->prev; ev->next = eok->evs_free; eok->evs_free = ev->next; + + eok->evs_count -= 1; } static void eok_wakeup(ep_over_kq_t *eok) { @@ -486,17 +663,23 @@ static int eok_chgs_refresh(ep_over_kq_t *eok, eok_event_t *oev, eok_event_t *ev } oev->fd = ev->fd; - oev->epev = ev->epev; + if (ev->changed!=3) { + oev->epev = ev->epev; + } oev->changed = ev->changed; + n = 0; if (krev->ident==ev->fd) { krev->udata = (uint64_t)oev; eok->kchanges[eok->ichanges++] = *krev; + ++n; } if (kwev->ident==ev->fd) { kwev->udata = (uint64_t)oev; eok->kchanges[eok->ichanges++] = *kwev; + ++n; } + D("add changes[%d] for fd[%d], and changes/registers [%d/%d]", n, ev->fd, eok->ichanges, eok->evs_count); return 0; } diff --git a/tests/examples/c/epoll.c b/tests/examples/c/epoll.c index e0d63bae322839a77dff7c8e9c31b44840e9294c..6047c9e4eac3e30f943e132e57a317d68657ee20 100644 --- a/tests/examples/c/epoll.c +++ b/tests/examples/c/epoll.c @@ -54,18 +54,74 @@ static ep_t* ep_create(void); static void ep_destroy(ep_t *ep); static void* routine(void* arg); static int open_connect(unsigned short port); +static int open_listen(unsigned short port); + +typedef struct client_s client_t; +struct client_s { + int skt; + void (*on_event)(ep_t *ep, struct epoll_event *events, client_t *client); + volatile unsigned int state; // 1: listenning; 2: connected +}; + +static void echo_event(ep_t *ep, struct epoll_event *ev, client_t *client); int main(int argc, char *argv[]) { ep_t* ep = ep_create(); A(ep, "failed"); int skt = open_connect(6789); if (skt!=-1) { - struct epoll_event ev = {0}; - ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP; - ev.data.ptr = &skt; - A(0==epoll_ctl(ep->ep, EPOLL_CTL_ADD, skt, &ev), ""); + client_t *client = (client_t*)calloc(1, sizeof(*client)); + if (client) { + client->skt = skt; + client->on_event = echo_event; + client->state = 2; + struct epoll_event ev = {0}; + ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP; + ev.data.ptr = client; + A(0==epoll_ctl(ep->ep, EPOLL_CTL_ADD, skt, &ev), ""); + } + } + skt = open_listen(0); + if (skt!=-1) { + client_t *client = (client_t*)calloc(1, sizeof(*client)); + if (client) { + client->skt = skt; + client->on_event = echo_event; + client->state = 1; + struct epoll_event ev = {0}; + ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP; + ev.data.ptr = client; + A(0==epoll_ctl(ep->ep, EPOLL_CTL_ADD, skt, &ev), ""); + } + } + // char c = '\0'; + // while ((c=getchar())!=EOF) { + // switch (c) { + // case 'q': break; + // default: continue; + // } + // } + // getchar(); + char *line = NULL; + size_t linecap = 0; + ssize_t linelen; + while ((linelen = getline(&line, &linecap, stdin)) > 0) { + line[strlen(line)-1] = '\0'; + if (0==strcmp(line, "exit")) break; + if (0==strcmp(line, "quit")) break; + if (line==strstr(line, "close")) { + int fd = 0; + sscanf(line, "close %d", &fd); + if (fd<=2) { + fprintf(stderr, "fd [%d] invalid\n", fd); + continue; + } + A(0==epoll_ctl(ep->ep, EPOLL_CTL_DEL, fd, NULL), ""); + continue; + } + if (strlen(line)==0) continue; + fprintf(stderr, "unknown cmd:[%s]\n", line); } - getchar(); ep_destroy(ep); D(""); return 0; @@ -114,7 +170,7 @@ static void* routine(void* arg) { A(0==pthread_mutex_unlock(&ep->lock), ""); int r = epoll_wait(ep->ep, evs, sizeof(evs)/sizeof(evs[0]), -1); - A(r>0, "indefinite epoll_wait shall not timeout"); + A(r>0, "indefinite epoll_wait shall not timeout:[%d]", r); A(0==pthread_mutex_lock(&ep->lock), ""); A(ep->waiting==1, "internal logic error"); @@ -133,23 +189,50 @@ static void* routine(void* arg) { continue; } A(ev->data.ptr, "internal logic error"); - int skt = *(int*)ev->data.ptr; - if (ev->events & EPOLLIN) { - char buf[4]; - int n = recv(skt, buf, sizeof(buf)-1, 0); - A(n>=0 && nevents, buf); - } - if (ev->events & EPOLLRDHUP) { - A(0==epoll_ctl(ep->ep, EPOLL_CTL_DEL, skt, NULL), ""); - } + client_t *client = (client_t*)ev->data.ptr; + client->on_event(ep, ev, client); continue; } } return NULL; } +static int open_listen(unsigned short port) { + int r = 0; + int skt = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); + if (skt==-1) { + E("socket() failed"); + return -1; + } + do { + struct sockaddr_in si = {0}; + si.sin_family = AF_INET; + si.sin_addr.s_addr = inet_addr("127.0.0.1"); + si.sin_port = htons(port); + r = bind(skt, (struct sockaddr*)&si, sizeof(si)); + if (r) { + E("bind(%u) failed", port); + break; + } + r = listen(skt, 100); + if (r) { + E("listen() failed"); + break; + } + memset(&si, 0, sizeof(si)); + socklen_t len = sizeof(si); + r = getsockname(skt, (struct sockaddr *)&si, &len); + if (r) { + E("getsockname() failed"); + } + A(len==sizeof(si), "internal logic error"); + D("listenning at: %d", ntohs(si.sin_port)); + return skt; + } while (0); + close(skt); + return -1; +} + static int open_connect(unsigned short port) { int r = 0; int skt = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); @@ -181,3 +264,41 @@ static int open_connect(unsigned short port) { return -1; } +static void echo_event(ep_t *ep, struct epoll_event *ev, client_t *client) { + if (ev->events & EPOLLIN) { + if (client->state==1) { + struct sockaddr_in si = {0}; + socklen_t silen = sizeof(si); + int skt = accept(client->skt, (struct sockaddr*)&si, &silen); + if (skt!=-1) { + client_t *server = (client_t*)calloc(1, sizeof(*server)); + if (server) { + server->skt = skt; + server->on_event = echo_event; + server->state = 2; + struct epoll_event ev = {0}; + ev.events = EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP; + ev.data.ptr = server; + A(0==epoll_ctl(ep->ep, EPOLL_CTL_ADD, skt, &ev), ""); + } + } + } + if (client->state==2) { + char buf[4]; + int n = recv(client->skt, buf, sizeof(buf)-1, 0); + A(n>=0 && nevents, buf); + } + } + if (ev->events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { + A(0==pthread_mutex_lock(&ep->lock), ""); + A(0==epoll_ctl(ep->ep, EPOLL_CTL_DEL, client->skt, NULL), ""); + A(0==pthread_mutex_unlock(&ep->lock), ""); + close(client->skt); + client->skt = -1; + client->on_event = NULL; + free(client); + } +} +