提交 985de782 编写于 作者: F freemine

with epoll over kqueue, taos can be built on MacOSX

上级 1e7037e8
...@@ -350,6 +350,9 @@ void *shellLoopQuery(void *arg) { ...@@ -350,6 +350,9 @@ void *shellLoopQuery(void *arg) {
reset_terminal_mode(); reset_terminal_mode();
} while (shellRunCommand(con, command) == 0); } while (shellRunCommand(con, command) == 0);
tfree(command);
exitShell();
pthread_cleanup_pop(1); pthread_cleanup_pop(1);
return NULL; return NULL;
......
...@@ -70,5 +70,5 @@ int epoll_close(int epfd); ...@@ -70,5 +70,5 @@ int epoll_close(int epfd);
} }
#endif #endif
#endif // _eok_h_fd274616_996c_400e_9023_ae70be881fa3_ #endif // _eok_h_fd274616_996c_400e_9023_ae70be881fa3_
...@@ -17,8 +17,6 @@ ...@@ -17,8 +17,6 @@
#include "os.h" #include "os.h"
#include "tglobal.h" #include "tglobal.h"
#include <sys/event.h>
void osInit() { void osInit() {
if (configDir[0] == 0) { if (configDir[0] == 0) {
strcpy(configDir, "~/TDengine/cfg"); strcpy(configDir, "~/TDengine/cfg");
......
...@@ -34,20 +34,31 @@ typedef struct eok_event_s eok_event_t; ...@@ -34,20 +34,31 @@ typedef struct eok_event_s eok_event_t;
struct ep_over_kq_s { struct ep_over_kq_s {
int kq; int kq;
// !!!
// idx in the eoks list
// used as pseudo-file-desciptor
// must be 'closed' with epoll_close
int idx; int idx;
ep_over_kq_t *next; ep_over_kq_t *next;
int sv[2]; // 0 for read, 1 for write int sv[2]; // 0 for read, 1 for write
// all registered 'epoll events, key by fd'
int evs_count; int evs_count;
eok_event_t *evs_head; eok_event_t *evs_head;
eok_event_t *evs_tail; eok_event_t *evs_tail;
eok_event_t *evs_free; eok_event_t *evs_free;
// all kev changes list pending to be processed by kevent64
// key by tuple (ident,filter), ident === fd in this case
struct kevent64_s *kchanges; struct kevent64_s *kchanges;
int nchanges; int nchanges;
int ichanges; int ichanges;
// kev eventslist for kevent64 to store active events
// they remain alive among kevent64 calls
struct kevent64_s *kevslist; struct kevent64_s *kevslist;
int nevslist; int nevslist;
...@@ -76,6 +87,7 @@ struct eoks_s { ...@@ -76,6 +87,7 @@ struct eoks_s {
int neoks; int neoks;
ep_over_kq_t *eoks_free; ep_over_kq_t *eoks_free;
}; };
static eoks_t eoks = { static eoks_t eoks = {
.lock = PTHREAD_MUTEX_INITIALIZER, .lock = PTHREAD_MUTEX_INITIALIZER,
.eoks = NULL, .eoks = NULL,
...@@ -93,8 +105,9 @@ static const char* op_str(int op) { ...@@ -93,8 +105,9 @@ static const char* op_str(int op) {
} }
static __thread char buf_slots[10][1024] = {0}; static __thread char buf_slots[10][1024] = {0};
static __thread int buf_slots_linelen = sizeof(buf_slots[0])/sizeof(buf_slots[0][0]); static __thread int buf_slots_linelen = sizeof(buf_slots[0])/sizeof(buf_slots[0][0]);
static __thread int buf_slots_count = sizeof(buf_slots)/(sizeof(buf_slots[0])/sizeof(buf_slots[0][0])); static __thread int buf_slots_count = sizeof(buf_slots)/(sizeof(buf_slots[0])/sizeof(buf_slots[0][0]));
static const char* events_str(uint32_t events, int slots) { static const char* events_str(uint32_t events, int slots) {
A(slots>=0 && slots<buf_slots_count, "internal logic error"); A(slots>=0 && slots<buf_slots_count, "internal logic error");
char *buf = buf_slots[slots]; char *buf = buf_slots[slots];
...@@ -102,6 +115,7 @@ static const char* events_str(uint32_t events, int slots) { ...@@ -102,6 +115,7 @@ static const char* events_str(uint32_t events, int slots) {
size_t len = buf_slots_linelen; size_t len = buf_slots_linelen;
int n = 0; int n = 0;
buf[0] = '\0'; buf[0] = '\0';
// copied from <sys/epoll.h> on linux
// EPOLLIN = 0x001, // EPOLLIN = 0x001,
// #define EPOLLIN EPOLLIN // #define EPOLLIN EPOLLIN
// EPOLLPRI = 0x002, // EPOLLPRI = 0x002,
...@@ -164,6 +178,7 @@ static const char* kev_flags_str(uint16_t flags, int slots) { ...@@ -164,6 +178,7 @@ static const char* kev_flags_str(uint16_t flags, int slots) {
size_t len = buf_slots_linelen; size_t len = buf_slots_linelen;
int n = 0; int n = 0;
buf[0] = '\0'; buf[0] = '\0';
// copied to <sys/event.h>
// #define EV_ADD 0x0001 /* add event to kq (implies enable) */ // #define EV_ADD 0x0001 /* add event to kq (implies enable) */
// #define EV_DELETE 0x0002 /* delete event from kq */ // #define EV_DELETE 0x0002 /* delete event from kq */
// #define EV_ENABLE 0x0004 /* enable event */ // #define EV_ENABLE 0x0004 /* enable event */
...@@ -233,10 +248,7 @@ int epoll_create(int size) { ...@@ -233,10 +248,7 @@ int epoll_create(int size) {
(void)size; (void)size;
int e = 0; int e = 0;
ep_over_kq_t *eok = eoks_alloc(); ep_over_kq_t *eok = eoks_alloc();
if (!eok) { if (!eok) return -1;
errno = ENOMEM;
return -1;
}
A(eok->kq==-1, "internal logic error"); A(eok->kq==-1, "internal logic error");
A(eok->lock_valid, "internal logic error"); A(eok->lock_valid, "internal logic error");
...@@ -263,13 +275,14 @@ int epoll_create(int size) { ...@@ -263,13 +275,14 @@ int epoll_create(int size) {
struct epoll_event ev = {0}; struct epoll_event ev = {0};
ev.events = EPOLLIN; ev.events = EPOLLIN;
ev.data.ptr = &eok_dummy; ev.data.ptr = &eok_dummy;
D("epoll_create epfd:[%d]", eok->idx);
if (epoll_ctl(eok->idx, EPOLL_CTL_ADD, eok->sv[0], &ev)) { if (epoll_ctl(eok->idx, EPOLL_CTL_ADD, eok->sv[0], &ev)) {
A(0, "internal logic error"); e = errno;
epoll_close(eok->idx); epoll_close(eok->idx);
errno = e;
return -1; return -1;
} }
D("epoll_create epfd:[%d]", eok->idx);
return eok->idx; return eok->idx;
} }
...@@ -309,36 +322,34 @@ int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) { ...@@ -309,36 +322,34 @@ int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) {
e = ENOENT; e = ENOENT;
break; break;
} }
if (op!=EPOLL_CTL_DEL && !event) {
e = EINVAL;
break;
}
// prepare krev/kwev
struct kevent64_s krev = {0}; struct kevent64_s krev = {0};
struct kevent64_s kwev = {0}; struct kevent64_s kwev = {0};
krev.ident = -1; krev.ident = -1;
kwev.ident = -1; kwev.ident = -1;
uint16_t flags = 0; uint16_t flags = 0;
// prepare internal eok event
eok_event_t ev = {0}; eok_event_t ev = {0};
ev.fd = fd; ev.fd = fd;
if (event) ev.epev = *event; if (event) ev.epev = *event;
struct epoll_event *pev = event; struct epoll_event *pev = event;
switch (op) { switch (op) {
case EPOLL_CTL_ADD: { case EPOLL_CTL_ADD: {
if (!event) {
e = EINVAL;
break;
}
flags = EV_ADD; flags = EV_ADD;
ev.changed = 1; ev.changed = 1;
} break; } break;
case EPOLL_CTL_MOD: { case EPOLL_CTL_MOD: {
if (!event) {
e = EINVAL;
break;
}
flags = EV_ADD; flags = EV_ADD;
ev.changed = 2; ev.changed = 2;
} break; } break;
case EPOLL_CTL_DEL: { case EPOLL_CTL_DEL: {
D("epoll_ctl adding...");
// event is ignored // event is ignored
// pev points to registered epoll_event
pev = &oev->epev; pev = &oev->epev;
flags = EV_DELETE; flags = EV_DELETE;
ev.changed = 3; ev.changed = 3;
...@@ -350,6 +361,7 @@ int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) { ...@@ -350,6 +361,7 @@ int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) {
if (e) break; if (e) break;
// udata will be delayed to be set
if (pev->events & (EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { if (pev->events & (EPOLLIN | EPOLLERR | EPOLLHUP | EPOLLRDHUP)) {
flags |= EV_EOF; flags |= EV_EOF;
EV_SET64(&krev, ev.fd, EVFILT_READ, flags, 0, 0, -1, 0, 0); EV_SET64(&krev, ev.fd, EVFILT_READ, flags, 0, 0, -1, 0, 0);
...@@ -358,6 +370,7 @@ int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) { ...@@ -358,6 +370,7 @@ int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event) {
EV_SET64(&kwev, ev.fd, EVFILT_WRITE, flags, 0, 0, -1, 0, 0); EV_SET64(&kwev, ev.fd, EVFILT_WRITE, flags, 0, 0, -1, 0, 0);
} }
// refresh registered evlist and changelist in a transaction way
if (eok_chgs_refresh(eok, oev, &ev, &krev, &kwev)) { if (eok_chgs_refresh(eok, oev, &ev, &krev, &kwev)) {
e = errno; e = errno;
A(e, "internal logic error"); A(e, "internal logic error");
...@@ -380,11 +393,6 @@ static struct timespec do_timespec_diff(struct timespec *from, struct timespec * ...@@ -380,11 +393,6 @@ static struct timespec do_timespec_diff(struct timespec *from, struct timespec *
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) { int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) {
int e = 0; int e = 0;
if (epfd<0 || epfd>=eoks.neoks) {
errno = EBADF;
E("epoll_waiting epfd:[%d], maxevents:[%d], timeout:[%d] failed", epfd, maxevents, timeout);
return -1;
}
if (!events) { if (!events) {
errno = EINVAL; errno = EINVAL;
E("epoll_waiting epfd:[%d], maxevents:[%d], timeout:[%d] failed", epfd, maxevents, timeout); E("epoll_waiting epfd:[%d], maxevents:[%d], timeout:[%d] failed", epfd, maxevents, timeout);
...@@ -396,15 +404,6 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) ...@@ -396,15 +404,6 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
return -1; return -1;
} }
int r = 0;
ep_over_kq_t *eok = eoks_find(epfd);
if (!eok) {
errno = EBADF;
E("epoll_waiting epfd:[%d], maxevents:[%d], timeout:[%d] failed", epfd, maxevents, timeout);
return -1;
}
struct timespec abstime = {0}; struct timespec abstime = {0};
A(TIME_UTC==timespec_get(&abstime, TIME_UTC), "internal logic error"); A(TIME_UTC==timespec_get(&abstime, TIME_UTC), "internal logic error");
...@@ -412,7 +411,17 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) ...@@ -412,7 +411,17 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
if (timeout<0) timeout = 0; if (timeout<0) timeout = 0;
int64_t t = abstime.tv_nsec + timeout * 1000000; int64_t t = abstime.tv_nsec + timeout * 1000000;
abstime.tv_sec += t / 1000000000; abstime.tv_sec += t / 1000000000;
abstime.tv_nsec %= 1000000000; abstime.tv_nsec = t % 1000000000;
}
int r = 0;
ep_over_kq_t *eok = eoks_find(epfd);
if (!eok) {
errno = EBADF;
E("epoll_waiting epfd:[%d], maxevents:[%d], timeout:[%d] failed", epfd, maxevents, timeout);
errno = EBADF;
return -1;
} }
int cnts = 0; int cnts = 0;
...@@ -431,43 +440,52 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) ...@@ -431,43 +440,52 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
struct timespec now = {0}; struct timespec now = {0};
A(TIME_UTC==timespec_get(&now, TIME_UTC), "internal logic error"); A(TIME_UTC==timespec_get(&now, TIME_UTC), "internal logic error");
struct timespec to = do_timespec_diff(&now, &abstime); struct timespec to = do_timespec_diff(&now, &abstime);
struct timespec *pto = NULL; struct timespec *pto = &to;
if (timeout!=-1) { if (timeout==-1) {
pto = &to; pto = NULL;
} }
eok->changed = 0; // taking the changelist
eok->wakenup = 0;
eok->waiting = 1;
struct kevent64_s *kchanges = eok->kchanges; struct kevent64_s *kchanges = eok->kchanges;
int nchanges = eok->nchanges; int nchanges = eok->nchanges;
int ichanges = eok->ichanges; int ichanges = eok->ichanges;
// let outside world to add changes
eok->kchanges = NULL; eok->kchanges = NULL;
eok->nchanges = 0; eok->nchanges = 0;
eok->ichanges = 0; eok->ichanges = 0;
eok->changed = 0;
eok->wakenup = 0;
eok->waiting = 1;
A(0==pthread_mutex_unlock(&eok->lock), ""); A(0==pthread_mutex_unlock(&eok->lock), "");
if (ichanges>0) { if (ichanges>0) {
D("kevent64 changing [%d] changes and waiting...", ichanges); D("kevent64 epfd[%d] changing [%d] changes and waiting...", eok->idx, ichanges);
} }
errno = 0; errno = 0;
r = kevent64(eok->kq, kchanges, ichanges, eventslist, maxevents, 0, pto); r = kevent64(eok->kq, kchanges, ichanges, eventslist, maxevents, 0, pto);
e = errno; e = errno;
if (e) { if (e) {
E("kevent64 waiting done, with r[%d]", r); E("kevent64 epfd[%d] waiting done, with r[%d]", eok->idx, r);
} }
A(0==pthread_mutex_lock(&eok->lock), ""); A(0==pthread_mutex_lock(&eok->lock), "");
eok->waiting = 0; eok->waiting = 0;
if (kchanges) { if (kchanges) {
free(kchanges); if (eok->kchanges==NULL) {
kchanges = NULL; // reuse
A(eok->nchanges==0 && eok->ichanges==0, "internal logic error");
eok->kchanges = kchanges;
eok->nchanges = nchanges;
} else {
free(kchanges);
kchanges = NULL;
}
nchanges = 0; nchanges = 0;
ichanges = 0; ichanges = 0;
} }
eok->waiting = 0;
if (r==0) { if (r==0) {
A(timeout!=-1, "internal logic error"); A(timeout!=-1, "internal logic error");
} }
...@@ -484,28 +502,37 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) ...@@ -484,28 +502,37 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
case EVFILT_READ: { case EVFILT_READ: {
A((ev->epev.events & EPOLLIN), "internal logic errro"); A((ev->epev.events & EPOLLIN), "internal logic errro");
if (ev->epev.data.ptr==&eok_dummy) { if (ev->epev.data.ptr==&eok_dummy) {
// it's coming from wakeup socket pair
char c = '\0'; char c = '\0';
A(1==recv(kev->ident, &c, 1, 0), "internal logic error"); A(1==recv(kev->ident, &c, 1, 0), "internal logic error");
A(0==memcmp(&c, "1", 1), "internal logic error"); A(0==memcmp(&c, "1", 1), "internal logic error");
D("wokenup"); D("wokenup");
continue;
} else { } else {
if (ev->changed==3) { if (ev->changed==3) {
D("already requested to delete for fd[%d]", ev->fd); D("already requested to delete for fd[%d]", ev->fd);
// TODO: write a unit test for this case
// EV_DELETE? // EV_DELETE?
continue; continue;
} }
// converting to epoll_event
// we shall collect all kevents for the uniq fd into one epoll_evnt
// but currently, taos never use EPOLLOUT
// just let it this way for the moment
struct epoll_event pev = {0}; struct epoll_event pev = {0};
pev.data.ptr = ev->epev.data.ptr; pev.data.ptr = ev->epev.data.ptr;
pev.events = EPOLLIN; pev.events = EPOLLIN;
if (kev->flags & EV_EOF) { if (kev->flags & EV_EOF) {
// take all these as EOF for the moment
pev.events |= (EPOLLHUP | EPOLLERR | EPOLLRDHUP); pev.events |= (EPOLLHUP | EPOLLERR | EPOLLRDHUP);
D("..........");
} }
// rounded to what user care
pev.events = pev.events & ev->epev.events; pev.events = pev.events & ev->epev.events;
D("events found for fd[%d]: [%04x:%s], which was registered: [%04x:%s], kev_flags: [%04x:%s]", D("events found for fd[%d]: [%04x:%s], which was registered: [%04x:%s], kev_flags: [%04x:%s]",
ev->fd, pev.events, events_str(pev.events, 0), ev->fd, pev.events, events_str(pev.events, 0),
ev->epev.events, events_str(ev->epev.events, 1), ev->epev.events, events_str(ev->epev.events, 1),
kev->flags, kev_flags_str(kev->flags, 2)); kev->flags, kev_flags_str(kev->flags, 2));
// now we get ev and store it
events[cnts++] = pev; events[cnts++] = pev;
} }
} break; } break;
...@@ -518,6 +545,8 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) ...@@ -518,6 +545,8 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
} }
} }
if (r>=0) { if (r>=0) {
// we can safely rule out delete-requested-events from the regitered evlists
// if only changelist are correctly registered
eok_event_t *p = eok->evs_head; eok_event_t *p = eok->evs_head;
while (p) { while (p) {
eok_event_t *next = p->next; eok_event_t *next = p->next;
...@@ -528,6 +557,14 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) ...@@ -528,6 +557,14 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
p = next; p = next;
} }
} }
if (cnts==0) {
// if no user-cared-events is up
// we check to see if time is up
A(TIME_UTC==timespec_get(&now, TIME_UTC), "internal logic error");
to = do_timespec_diff(&now, &abstime);
if (to.tv_sec==0 && to.tv_nsec==0) break;
// time is not up yet, continue loop
}
} while (cnts==0); } while (cnts==0);
if (cnts>0) { if (cnts>0) {
D("kevent64 waiting done with [%d] events", cnts); D("kevent64 waiting done with [%d] events", cnts);
...@@ -540,6 +577,7 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) ...@@ -540,6 +577,7 @@ int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
return -1; return -1;
} }
// tell user how many events are valid
return cnts; return cnts;
} }
...@@ -547,19 +585,20 @@ static struct timespec do_timespec_diff(struct timespec *from, struct timespec * ...@@ -547,19 +585,20 @@ static struct timespec do_timespec_diff(struct timespec *from, struct timespec *
struct timespec delta; struct timespec delta;
delta.tv_sec = to->tv_sec - from->tv_sec; delta.tv_sec = to->tv_sec - from->tv_sec;
delta.tv_nsec = to->tv_nsec - from->tv_nsec; delta.tv_nsec = to->tv_nsec - from->tv_nsec;
// norm and round up
while (delta.tv_nsec<0) { while (delta.tv_nsec<0) {
delta.tv_sec -= 1; delta.tv_sec -= 1;
delta.tv_nsec += 1000000000; delta.tv_nsec += 1000000000;
} }
if (delta.tv_sec < 0) {
delta.tv_sec = 0;
delta.tv_nsec = 0;
}
return delta; return delta;
} }
int epoll_close(int epfd) { int epoll_close(int epfd) {
D("epoll_closing epfd: [%d]", epfd); D("epoll_closing epfd: [%d]", epfd);
if (epfd<0 || epfd>=eoks.neoks) {
errno = EBADF;
return -1;
}
ep_over_kq_t *eok = eoks_find(epfd); ep_over_kq_t *eok = eoks_find(epfd);
if (!eok) { if (!eok) {
errno = EBADF; errno = EBADF;
...@@ -568,9 +607,11 @@ int epoll_close(int epfd) { ...@@ -568,9 +607,11 @@ int epoll_close(int epfd) {
A(0==pthread_mutex_lock(&eok->lock), ""); A(0==pthread_mutex_lock(&eok->lock), "");
do { do {
// panic if it would be double-closed
A(eok->stopping==0, "internal logic error"); A(eok->stopping==0, "internal logic error");
eok->stopping = 1; eok->stopping = 1;
A(eok->waiting, "internal logic error"); // panic if epoll_wait is pending
A(eok->waiting==0, "internal logic error");
if (eok->kq!=-1) { if (eok->kq!=-1) {
close(eok->kq); close(eok->kq);
...@@ -608,9 +649,12 @@ static eok_event_t* eok_calloc_ev(ep_over_kq_t *eok) { ...@@ -608,9 +649,12 @@ static eok_event_t* eok_calloc_ev(ep_over_kq_t *eok) {
p = eok->evs_free; p = eok->evs_free;
eok->evs_free = p->next; eok->evs_free = p->next;
p->next = NULL; p->next = NULL;
A(p->prev==NULL, "internal logic error");
} else { } else {
p = (eok_event_t*)calloc(1, sizeof(*p)); p = (eok_event_t*)calloc(1, sizeof(*p));
if (!p) return NULL; if (!p) return NULL;
A(p->next==NULL, "internal logic error");
A(p->prev==NULL, "internal logic error");
} }
// dirty link // dirty link
p->prev = eok->evs_tail; p->prev = eok->evs_tail;
...@@ -626,9 +670,9 @@ static eok_event_t* eok_calloc_ev(ep_over_kq_t *eok) { ...@@ -626,9 +670,9 @@ static eok_event_t* eok_calloc_ev(ep_over_kq_t *eok) {
static void eok_free_ev(ep_over_kq_t *eok, eok_event_t *ev) { static void eok_free_ev(ep_over_kq_t *eok, eok_event_t *ev) {
if (ev->prev) ev->prev->next = ev->next; if (ev->prev) ev->prev->next = ev->next;
else eok->evs_head = ev->next; else eok->evs_head = ev->next;
ev->prev = NULL;
if (ev->next) ev->next->prev = ev->prev; if (ev->next) ev->next->prev = ev->prev;
else eok->evs_tail = ev->prev; else eok->evs_tail = ev->prev;
ev->prev = NULL;
ev->next = eok->evs_free; ev->next = eok->evs_free;
eok->evs_free = ev->next; eok->evs_free = ev->next;
...@@ -639,7 +683,7 @@ static void eok_wakeup(ep_over_kq_t *eok) { ...@@ -639,7 +683,7 @@ static void eok_wakeup(ep_over_kq_t *eok) {
if (!eok->waiting) return; if (!eok->waiting) return;
if (eok->wakenup) return; if (eok->wakenup) return;
eok->wakenup = 1; eok->wakenup = 1;
send(eok->sv[1], "1", 1, 0); A(1==send(eok->sv[1], "1", 1, 0), "");
} }
static int eok_chgs_refresh(ep_over_kq_t *eok, eok_event_t *oev, eok_event_t *ev, struct kevent64_s *krev, struct kevent64_s *kwev) { static int eok_chgs_refresh(ep_over_kq_t *eok, eok_event_t *oev, eok_event_t *ev, struct kevent64_s *krev, struct kevent64_s *kwev) {
...@@ -653,6 +697,8 @@ static int eok_chgs_refresh(ep_over_kq_t *eok, eok_event_t *oev, eok_event_t *ev ...@@ -653,6 +697,8 @@ static int eok_chgs_refresh(ep_over_kq_t *eok, eok_event_t *oev, eok_event_t *ev
struct kevent64_s *p = (struct kevent64_s*)realloc(eok->kchanges, sizeof(*p) * (eok->nchanges + 10)); struct kevent64_s *p = (struct kevent64_s*)realloc(eok->kchanges, sizeof(*p) * (eok->nchanges + 10));
if (!p) { if (!p) {
if (ev->changed==1) { if (ev->changed==1) {
// roll back
A(oev, "internal logic error");
eok_free_ev(eok, oev); eok_free_ev(eok, oev);
} }
errno = ENOMEM; errno = ENOMEM;
...@@ -662,12 +708,15 @@ static int eok_chgs_refresh(ep_over_kq_t *eok, eok_event_t *oev, eok_event_t *ev ...@@ -662,12 +708,15 @@ static int eok_chgs_refresh(ep_over_kq_t *eok, eok_event_t *oev, eok_event_t *ev
eok->nchanges += 10; eok->nchanges += 10;
} }
// copy to registered event slot
oev->fd = ev->fd; oev->fd = ev->fd;
if (ev->changed!=3) { if (ev->changed!=3) {
// if add/mod, copy epoll_event
oev->epev = ev->epev; oev->epev = ev->epev;
} }
oev->changed = ev->changed; oev->changed = ev->changed;
// copy to changes list
n = 0; n = 0;
if (krev->ident==ev->fd) { if (krev->ident==ev->fd) {
krev->udata = (uint64_t)oev; krev->udata = (uint64_t)oev;
...@@ -679,7 +728,7 @@ static int eok_chgs_refresh(ep_over_kq_t *eok, eok_event_t *oev, eok_event_t *ev ...@@ -679,7 +728,7 @@ static int eok_chgs_refresh(ep_over_kq_t *eok, eok_event_t *oev, eok_event_t *ev
eok->kchanges[eok->ichanges++] = *kwev; eok->kchanges[eok->ichanges++] = *kwev;
++n; ++n;
} }
D("add changes[%d] for fd[%d], and changes/registers [%d/%d]", n, ev->fd, eok->ichanges, eok->evs_count); D("add #changes[%d] for fd[%d], and now #changes/registers [%d/%d]", n, ev->fd, eok->ichanges, eok->evs_count);
return 0; return 0;
} }
...@@ -707,7 +756,10 @@ static ep_over_kq_t* eoks_alloc(void) { ...@@ -707,7 +756,10 @@ static ep_over_kq_t* eoks_alloc(void) {
} while (0); } while (0);
A(0==pthread_mutex_unlock(&eoks.lock), ""); A(0==pthread_mutex_unlock(&eoks.lock), "");
if (!eok) return NULL; if (!eok) {
errno = ENOMEM;
return NULL;
}
if (eok->lock_valid) { if (eok->lock_valid) {
return eok; return eok;
} }
...@@ -718,6 +770,7 @@ static ep_over_kq_t* eoks_alloc(void) { ...@@ -718,6 +770,7 @@ static ep_over_kq_t* eoks_alloc(void) {
} }
eoks_free(eok); eoks_free(eok);
errno = ENOMEM;
return NULL; return NULL;
} }
...@@ -725,16 +778,23 @@ static void eoks_free(ep_over_kq_t *eok) { ...@@ -725,16 +778,23 @@ static void eoks_free(ep_over_kq_t *eok) {
A(0==pthread_mutex_lock(&eoks.lock), ""); A(0==pthread_mutex_lock(&eoks.lock), "");
do { do {
A(eok->next==NULL, "internal logic error"); A(eok->next==NULL, "internal logic error");
A(eok->evs_head==NULL, "internal logic error");
// leave eok->kchanges as is
A(eok->ichanges==0, "internal logic error");
A(eok->waiting==0, "internal logic error"); A(eok->waiting==0, "internal logic error");
if (eok->sv[0]!=-1) { if (eok->evs_count==1) {
A(eok->evs_head && eok->evs_tail && eok->evs_head==eok->evs_tail, "internal logic error");
A(eok->evs_head->fd==eok->sv[0] && eok->sv[0]!=-1 && eok->sv[1]!=-1, "internal logic error");
// fd is critical system resource
close(eok->sv[0]); close(eok->sv[0]);
eok->sv[0] = -1; eok->sv[0] = -1;
}
if (eok->sv[1]!=-1) {
close(eok->sv[1]); close(eok->sv[1]);
eok->sv[1] = -1; eok->sv[1] = -1;
eok_free_ev(eok, eok->evs_head);
} }
A(eok->evs_head==NULL && eok->evs_tail==NULL && eok->evs_count==0, "internal logic error");
A(eok->sv[0]==-1 && eok->sv[1]==-1, "internal logic error");
if (eok->kq!=-1) { if (eok->kq!=-1) {
close(eok->kq); close(eok->kq);
eok->kq = -1; eok->kq = -1;
......
...@@ -58,7 +58,6 @@ void taosBlockSIGPIPE() { ...@@ -58,7 +58,6 @@ void taosBlockSIGPIPE() {
#ifndef TAOS_OS_FUNC_SOCKET_SETSOCKETOPT #ifndef TAOS_OS_FUNC_SOCKET_SETSOCKETOPT
int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) { int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) {
fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__);
return setsockopt(socketfd, level, optname, optval, (socklen_t)optlen); return setsockopt(socketfd, level, optname, optval, (socklen_t)optlen);
} }
......
...@@ -133,7 +133,6 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread ...@@ -133,7 +133,6 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
} }
pThreadObj->pollFd = (int64_t)epoll_create(10); // size does not matter pThreadObj->pollFd = (int64_t)epoll_create(10); // size does not matter
fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__);
if (pThreadObj->pollFd < 0) { if (pThreadObj->pollFd < 0) {
tError("%s failed to create TCP epoll", label); tError("%s failed to create TCP epoll", label);
code = -1; code = -1;
...@@ -294,7 +293,6 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void * ...@@ -294,7 +293,6 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *
} }
pThreadObj->pollFd = (SOCKET)epoll_create(10); // size does not matter pThreadObj->pollFd = (SOCKET)epoll_create(10); // size does not matter
fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__);
if (pThreadObj->pollFd < 0) { if (pThreadObj->pollFd < 0) {
tError("%s failed to create TCP client epoll", label); tError("%s failed to create TCP client epoll", label);
free(pThreadObj); free(pThreadObj);
...@@ -343,10 +341,6 @@ void taosCleanUpTcpClient(void *chandle) { ...@@ -343,10 +341,6 @@ void taosCleanUpTcpClient(void *chandle) {
void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) { void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) {
SThreadObj * pThreadObj = shandle; SThreadObj * pThreadObj = shandle;
fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__);
fprintf(stderr, "pThreadObj->ip:%d\n", pThreadObj->ip);
fprintf(stderr, "PF_INET/AF_INET:%d/%d\n", PF_INET, AF_INET);
fprintf(stderr, "ip/port:%x/%d\n", ip, port);
SOCKET fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip); SOCKET fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip);
if (fd < 0) return NULL; if (fd < 0) return NULL;
...@@ -358,7 +352,6 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin ...@@ -358,7 +352,6 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
localPort = (uint16_t)ntohs(sin.sin_port); localPort = (uint16_t)ntohs(sin.sin_port);
} }
fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__);
SFdObj *pFdObj = taosMallocFdObj(pThreadObj, fd); SFdObj *pFdObj = taosMallocFdObj(pThreadObj, fd);
if (pFdObj) { if (pFdObj) {
...@@ -369,7 +362,6 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin ...@@ -369,7 +362,6 @@ void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uin
pThreadObj->label, thandle, ip, port, localPort, pFdObj, pThreadObj->numOfFds); pThreadObj->label, thandle, ip, port, localPort, pFdObj, pThreadObj->numOfFds);
} else { } else {
tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno)); tError("%s failed to malloc client FdObj(%s)", pThreadObj->label, strerror(errno));
fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__);
taosCloseSocket(fd); taosCloseSocket(fd);
} }
...@@ -492,32 +484,27 @@ static void *taosProcessTcpData(void *param) { ...@@ -492,32 +484,27 @@ static void *taosProcessTcpData(void *param) {
if (fdNum < 0) continue; if (fdNum < 0) continue;
for (int i = 0; i < fdNum; ++i) { for (int i = 0; i < fdNum; ++i) {
fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__);
pFdObj = events[i].data.ptr; pFdObj = events[i].data.ptr;
if (events[i].events & EPOLLERR) { if (events[i].events & EPOLLERR) {
tDebug("%s %p FD:%p epoll errors", pThreadObj->label, pFdObj->thandle, pFdObj); tDebug("%s %p FD:%p epoll errors", pThreadObj->label, pFdObj->thandle, pFdObj);
fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__);
taosReportBrokenLink(pFdObj); taosReportBrokenLink(pFdObj);
continue; continue;
} }
if (events[i].events & EPOLLRDHUP) { if (events[i].events & EPOLLRDHUP) {
tDebug("%s %p FD:%p RD hang up", pThreadObj->label, pFdObj->thandle, pFdObj); tDebug("%s %p FD:%p RD hang up", pThreadObj->label, pFdObj->thandle, pFdObj);
fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__);
taosReportBrokenLink(pFdObj); taosReportBrokenLink(pFdObj);
continue; continue;
} }
if (events[i].events & EPOLLHUP) { if (events[i].events & EPOLLHUP) {
tDebug("%s %p FD:%p hang up", pThreadObj->label, pFdObj->thandle, pFdObj); tDebug("%s %p FD:%p hang up", pThreadObj->label, pFdObj->thandle, pFdObj);
fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__);
taosReportBrokenLink(pFdObj); taosReportBrokenLink(pFdObj);
continue; continue;
} }
if (taosReadTcpData(pFdObj, &recvInfo) < 0) { if (taosReadTcpData(pFdObj, &recvInfo) < 0) {
fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__);
shutdown(pFdObj->fd, SHUT_WR); shutdown(pFdObj->fd, SHUT_WR);
continue; continue;
} }
...@@ -563,7 +550,6 @@ static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd) { ...@@ -563,7 +550,6 @@ static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd) {
event.events = EPOLLIN | EPOLLRDHUP; event.events = EPOLLIN | EPOLLRDHUP;
event.data.ptr = pFdObj; event.data.ptr = pFdObj;
fprintf(stderr, "==%s[%d]%s()==\n", basename(__FILE__), __LINE__, __func__);
if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) { if (epoll_ctl(pThreadObj->pollFd, EPOLL_CTL_ADD, fd, &event) < 0) {
tfree(pFdObj); tfree(pFdObj);
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
......
...@@ -136,15 +136,12 @@ void *syncAllocateTcpConn(void *param, int64_t rid, int32_t connFd) { ...@@ -136,15 +136,12 @@ void *syncAllocateTcpConn(void *param, int64_t rid, int32_t connFd) {
event.events = EPOLLIN | EPOLLRDHUP; event.events = EPOLLIN | EPOLLRDHUP;
event.data.ptr = pConn; event.data.ptr = pConn;
fprintf(stderr, ">>>>>>>>>>>>>>>>>\n");
if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) { if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) {
fprintf(stderr, "<<<<<<<<<<<<<<<<<\n");
sError("failed to add fd:%d since %s", connFd, strerror(errno)); sError("failed to add fd:%d since %s", connFd, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
tfree(pConn); tfree(pConn);
pConn = NULL; pConn = NULL;
} else { } else {
fprintf(stderr, "<<<<<<<<<<<<<<<<<\n");
pThread->numOfFds++; pThread->numOfFds++;
sDebug("%p fd:%d is added to epoll thread, num:%d", pThread, connFd, pThread->numOfFds); sDebug("%p fd:%d is added to epoll thread, num:%d", pThread, connFd, pThread->numOfFds);
} }
...@@ -170,9 +167,7 @@ static void taosProcessBrokenLink(SConnObj *pConn) { ...@@ -170,9 +167,7 @@ static void taosProcessBrokenLink(SConnObj *pConn) {
(*pInfo->processBrokenLink)(pConn->handleId); (*pInfo->processBrokenLink)(pConn->handleId);
pThread->numOfFds--; pThread->numOfFds--;
fprintf(stderr, "<<<<<<<<<<<<<<<<<\n");
epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pConn->fd, NULL); epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pConn->fd, NULL);
fprintf(stderr, "<<<<<<<<<<<<<<<<<\n");
sDebug("%p fd:%d is removed from epoll thread, num:%d", pThread, pConn->fd, pThread->numOfFds); sDebug("%p fd:%d is removed from epoll thread, num:%d", pThread, pConn->fd, pThread->numOfFds);
taosClose(pConn->fd); taosClose(pConn->fd);
tfree(pConn); tfree(pConn);
...@@ -287,7 +282,6 @@ static SThreadObj *syncGetTcpThread(SPoolObj *pPool) { ...@@ -287,7 +282,6 @@ static SThreadObj *syncGetTcpThread(SPoolObj *pPool) {
pThread->pPool = pPool; pThread->pPool = pPool;
pThread->pollFd = epoll_create(10); // size does not matter pThread->pollFd = epoll_create(10); // size does not matter
fprintf(stderr, "...............\n");
if (pThread->pollFd < 0) { if (pThread->pollFd < 0) {
tfree(pThread); tfree(pThread);
return NULL; return NULL;
......
...@@ -649,13 +649,15 @@ static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) { ...@@ -649,13 +649,15 @@ static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) {
#ifdef __APPLE__ #ifdef __APPLE__
pRepo->readyToCommit = sem_open(NULL, O_CREAT, 0644, 1); pRepo->readyToCommit = sem_open(NULL, O_CREAT, 0644, 1);
if (!pRepo->readyToCommit) { if (pRepo->readyToCommit==SEM_FAILED) {
code = errno;
terrno = TAOS_SYSTEM_ERROR(code); terrno = TAOS_SYSTEM_ERROR(code);
goto _err; goto _err;
} }
#else #else
code = sem_init(&(pRepo->readyToCommit), 0, 1); code = sem_init(&(pRepo->readyToCommit), 0, 1);
if (code != 0) { if (code != 0) {
code = errno;
terrno = TAOS_SYSTEM_ERROR(code); terrno = TAOS_SYSTEM_ERROR(code);
goto _err; goto _err;
} }
......
...@@ -35,7 +35,7 @@ int32_t taosGetFqdn(char *fqdn) { ...@@ -35,7 +35,7 @@ int32_t taosGetFqdn(char *fqdn) {
hints.ai_flags = AI_CANONNAME; hints.ai_flags = AI_CANONNAME;
int32_t ret = getaddrinfo(hostname, NULL, &hints, &result); int32_t ret = getaddrinfo(hostname, NULL, &hints, &result);
if (!result) { if (!result) {
uError("failed to get fqdn for hostname <%s>, code:%d, reason:%s", hostname, ret, gai_strerror(ret)); uError("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret));
return -1; return -1;
} }
...@@ -342,6 +342,7 @@ int32_t taosKeepTcpAlive(SOCKET sockFd) { ...@@ -342,6 +342,7 @@ int32_t taosKeepTcpAlive(SOCKET sockFd) {
} }
#ifndef __APPLE__ #ifndef __APPLE__
// all fails on macosx
int32_t probes = 3; int32_t probes = 3;
if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPCNT, (void *)&probes, sizeof(probes)) < 0) { if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPCNT, (void *)&probes, sizeof(probes)) < 0) {
uError("fd:%d setsockopt SO_KEEPCNT failed: %d (%s)", sockFd, errno, strerror(errno)); uError("fd:%d setsockopt SO_KEEPCNT failed: %d (%s)", sockFd, errno, strerror(errno));
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册