提交 ce50e131 编写于 作者: F freemine

1. kqueue rather than SIGALRM for the timer

2. semaphore rather than dispatch_semaphore for tsem_xxx
3. fail-fast comments
4. niche
上级 0765b236
...@@ -295,6 +295,10 @@ void taos_close(TAOS *taos) { ...@@ -295,6 +295,10 @@ void taos_close(TAOS *taos) {
tscDebug("%p HB is freed", pHb); tscDebug("%p HB is freed", pHb);
taosReleaseRef(tscObjRef, pHb->self); taosReleaseRef(tscObjRef, pHb->self);
#ifdef __APPLE__
// to satisfy later tsem_destroy in taos_free_result
tsem_init(&pHb->rspSem, 0, 0);
#endif // __APPLE__
taos_free_result(pHb); taos_free_result(pHb);
} }
} }
......
...@@ -1933,6 +1933,10 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, in ...@@ -1933,6 +1933,10 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, in
} }
if (tscAddSubqueryInfo(pCmd) != TSDB_CODE_SUCCESS) { if (tscAddSubqueryInfo(pCmd) != TSDB_CODE_SUCCESS) {
#ifdef __APPLE__
// to satisfy later tsem_destroy in taos_free_result
tsem_init(&pNew->rspSem, 0, 0);
#endif // __APPLE__
tscFreeSqlObj(pNew); tscFreeSqlObj(pNew);
return NULL; return NULL;
} }
......
...@@ -13,19 +13,76 @@ ...@@ -13,19 +13,76 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
// fail-fast or let-it-crash philosophy
// https://en.wikipedia.org/wiki/Fail-fast
// https://stackoverflow.com/questions/4393197/erlangs-let-it-crash-philosophy-applicable-elsewhere
// experimentally, we follow log-and-crash here
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
// #define SEM_USE_PTHREAD
// #define SEM_USE_POSIX
#define SEM_USE_SEM
#ifdef SEM_USE_SEM
#include <mach/mach_init.h>
#include <mach/mach_error.h>
#include <mach/semaphore.h>
#include <mach/task.h>
static pthread_t sem_thread;
static pthread_once_t sem_once;
static task_t sem_port;
static volatile int sem_inited = 0;
static semaphore_t sem_exit;
static void* sem_thread_routine(void *arg) {
(void)arg;
sem_port = mach_task_self();
kern_return_t ret = semaphore_create(sem_port, &sem_exit, SYNC_POLICY_FIFO, 0);
if (ret != KERN_SUCCESS) {
fprintf(stderr, "==%s[%d]%s()==failed to create sem_exit\n", basename(__FILE__), __LINE__, __func__);
sem_inited = -1;
return NULL;
}
sem_inited = 1;
semaphore_wait(sem_exit);
return NULL;
}
static void once_init(void) {
int r = 0;
r = pthread_create(&sem_thread, NULL, sem_thread_routine, NULL);
if (r) {
fprintf(stderr, "==%s[%d]%s()==failed to create thread\n", basename(__FILE__), __LINE__, __func__);
return;
}
while (sem_inited==0) {
;
}
}
#endif
struct tsem_s { struct tsem_s {
#ifdef SEM_USE_PTHREAD
pthread_mutex_t lock;
pthread_cond_t cond;
volatile int64_t val;
#elif defined(SEM_USE_POSIX)
size_t id;
sem_t *sem;
#elif defined(SEM_USE_SEM)
semaphore_t sem;
#else // SEM_USE_PTHREAD
dispatch_semaphore_t sem; dispatch_semaphore_t sem;
#endif // SEM_USE_PTHREAD
volatile unsigned int valid:1; volatile unsigned int valid:1;
}; };
int tsem_wait(tsem_t *sem);
int tsem_post(tsem_t *sem);
int tsem_destroy(tsem_t *sem);
int tsem_init(tsem_t *sem, int pshared, unsigned int value) { int tsem_init(tsem_t *sem, int pshared, unsigned int value) {
fprintf(stderr, "==%s[%d]%s():[%p]==creating\n", basename(__FILE__), __LINE__, __func__, sem); // fprintf(stderr, "==%s[%d]%s():[%p]==creating\n", basename(__FILE__), __LINE__, __func__, sem);
if (*sem) { if (*sem) {
fprintf(stderr, "==%s[%d]%s():[%p]==already initialized\n", basename(__FILE__), __LINE__, __func__, sem); fprintf(stderr, "==%s[%d]%s():[%p]==already initialized\n", basename(__FILE__), __LINE__, __func__, sem);
abort(); abort();
...@@ -36,11 +93,61 @@ int tsem_init(tsem_t *sem, int pshared, unsigned int value) { ...@@ -36,11 +93,61 @@ int tsem_init(tsem_t *sem, int pshared, unsigned int value) {
abort(); abort();
} }
#ifdef SEM_USE_PTHREAD
int r = pthread_mutex_init(&p->lock, NULL);
do {
if (r) break;
r = pthread_cond_init(&p->cond, NULL);
if (r) {
pthread_mutex_destroy(&p->lock);
break;
}
p->val = value;
} while (0);
if (r) {
fprintf(stderr, "==%s[%d]%s():[%p]==not created\n", basename(__FILE__), __LINE__, __func__, sem);
abort();
}
#elif defined(SEM_USE_POSIX)
static size_t tick = 0;
do {
size_t id = atomic_add_fetch_64(&tick, 1);
if (id==SEM_VALUE_MAX) {
atomic_store_64(&tick, 0);
id = 0;
}
char name[NAME_MAX-4];
snprintf(name, sizeof(name), "/t%ld", id);
p->sem = sem_open(name, O_CREAT|O_EXCL, pshared, value);
p->id = id;
if (p->sem!=SEM_FAILED) break;
int e = errno;
if (e==EEXIST) continue;
if (e==EINTR) continue;
fprintf(stderr, "==%s[%d]%s():[%p]==not created[%d]%s\n", basename(__FILE__), __LINE__, __func__, sem, e, strerror(e));
abort();
} while (p->sem==SEM_FAILED);
#elif defined(SEM_USE_SEM)
pthread_once(&sem_once, once_init);
if (sem_inited!=1) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal resource init failed\n", basename(__FILE__), __LINE__, __func__, sem);
errno = ENOMEM;
return -1;
}
kern_return_t ret = semaphore_create(sem_port, &p->sem, SYNC_POLICY_FIFO, 0);
if (ret != KERN_SUCCESS) {
fprintf(stderr, "==%s[%d]%s():[%p]==semophore_create failed\n", basename(__FILE__), __LINE__, __func__, sem);
// we fail-fast here, because we have less-doc about semaphore_create for the moment
abort();
}
#else // SEM_USE_PTHREAD
p->sem = dispatch_semaphore_create(value); p->sem = dispatch_semaphore_create(value);
if (p->sem == NULL) { if (p->sem == NULL) {
fprintf(stderr, "==%s[%d]%s():[%p]==not created\n", basename(__FILE__), __LINE__, __func__, sem); fprintf(stderr, "==%s[%d]%s():[%p]==not created\n", basename(__FILE__), __LINE__, __func__, sem);
abort(); abort();
} }
#endif // SEM_USE_PTHREAD
p->valid = 1; p->valid = 1;
*sem = p; *sem = p;
...@@ -58,7 +165,30 @@ int tsem_wait(tsem_t *sem) { ...@@ -58,7 +165,30 @@ int tsem_wait(tsem_t *sem) {
fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", basename(__FILE__), __LINE__, __func__, sem); fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", basename(__FILE__), __LINE__, __func__, sem);
abort(); abort();
} }
#ifdef SEM_USE_PTHREAD
if (pthread_mutex_lock(&p->lock)) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem);
abort();
}
p->val -= 1;
if (p->val < 0) {
if (pthread_cond_wait(&p->cond, &p->lock)) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem);
abort();
}
}
if (pthread_mutex_unlock(&p->lock)) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem);
abort();
}
return 0;
#elif defined(SEM_USE_POSIX)
return sem_wait(p->sem);
#elif defined(SEM_USE_SEM)
return semaphore_wait(p->sem);
#else // SEM_USE_PTHREAD
return dispatch_semaphore_wait(p->sem, DISPATCH_TIME_FOREVER); return dispatch_semaphore_wait(p->sem, DISPATCH_TIME_FOREVER);
#endif // SEM_USE_PTHREAD
} }
int tsem_post(tsem_t *sem) { int tsem_post(tsem_t *sem) {
...@@ -71,21 +201,76 @@ int tsem_post(tsem_t *sem) { ...@@ -71,21 +201,76 @@ int tsem_post(tsem_t *sem) {
fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", basename(__FILE__), __LINE__, __func__, sem); fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", basename(__FILE__), __LINE__, __func__, sem);
abort(); abort();
} }
#ifdef SEM_USE_PTHREAD
if (pthread_mutex_lock(&p->lock)) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem);
abort();
}
p->val += 1;
if (p->val <= 0) {
if (pthread_cond_signal(&p->cond)) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem);
abort();
}
}
if (pthread_mutex_unlock(&p->lock)) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem);
abort();
}
return 0;
#elif defined(SEM_USE_POSIX)
return sem_post(p->sem);
#elif defined(SEM_USE_SEM)
return semaphore_signal(p->sem);
#else // SEM_USE_PTHREAD
return dispatch_semaphore_signal(p->sem); return dispatch_semaphore_signal(p->sem);
#endif // SEM_USE_PTHREAD
} }
int tsem_destroy(tsem_t *sem) { int tsem_destroy(tsem_t *sem) {
fprintf(stderr, "==%s[%d]%s():[%p]==destroying\n", basename(__FILE__), __LINE__, __func__, sem); // fprintf(stderr, "==%s[%d]%s():[%p]==destroying\n", basename(__FILE__), __LINE__, __func__, sem);
if (!*sem) { if (!*sem) {
fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", basename(__FILE__), __LINE__, __func__, sem); // fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", basename(__FILE__), __LINE__, __func__, sem);
abort(); // abort();
return 0; return 0;
} }
struct tsem_s *p = *sem; struct tsem_s *p = *sem;
if (!p->valid) { if (!p->valid) {
fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", basename(__FILE__), __LINE__, __func__, sem); // fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", basename(__FILE__), __LINE__, __func__, sem);
// abort();
return 0;
}
#ifdef SEM_USE_PTHREAD
if (pthread_mutex_lock(&p->lock)) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem);
abort();
}
p->valid = 0;
if (pthread_cond_destroy(&p->cond)) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem);
abort();
}
if (pthread_mutex_unlock(&p->lock)) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem);
abort();
}
if (pthread_mutex_destroy(&p->lock)) {
fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", basename(__FILE__), __LINE__, __func__, sem);
abort();
}
#elif defined(SEM_USE_POSIX)
char name[NAME_MAX-4];
snprintf(name, sizeof(name), "/t%ld", p->id);
int r = sem_unlink(name);
if (r) {
int e = errno;
fprintf(stderr, "==%s[%d]%s():[%p]==unlink failed[%d]%s\n", basename(__FILE__), __LINE__, __func__, sem, e, strerror(e));
abort(); abort();
} }
#elif defined(SEM_USE_SEM)
semaphore_destroy(sem_port, p->sem);
#else // SEM_USE_PTHREAD
#endif // SEM_USE_PTHREAD
p->valid = 0; p->valid = 0;
free(p); free(p);
......
...@@ -13,9 +13,82 @@ ...@@ -13,9 +13,82 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
// fail-fast or let-it-crash philosophy
// https://en.wikipedia.org/wiki/Fail-fast
// https://stackoverflow.com/questions/4393197/erlangs-let-it-crash-philosophy-applicable-elsewhere
// experimentally, we follow log-and-crash here
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#if 1
#include <sys/event.h>
static void (*timer_callback)(int);
static int timer_ms = 0;
static pthread_t timer_thread;
static int timer_kq = -1;
static volatile int timer_stop = 0;
static void* timer_routine(void *arg) {
(void)arg;
int r = 0;
struct timespec to = {0};
to.tv_sec = timer_ms / 1000;
to.tv_nsec = (timer_ms % 1000) * 1000000;
while (!timer_stop) {
struct kevent64_s kev[10] = {0};
r = kevent64(timer_kq, NULL, 0, kev, sizeof(kev)/sizeof(kev[0]), 0, &to);
if (r!=0) {
fprintf(stderr, "==%s[%d]%s()==kevent64 failed\n", basename(__FILE__), __LINE__, __func__);
abort();
}
timer_callback(SIGALRM); // just mock
}
return NULL;
}
int taosInitTimer(void (*callback)(int), int ms) {
int r = 0;
timer_ms = ms;
timer_callback = callback;
timer_kq = kqueue();
if (timer_kq==-1) {
fprintf(stderr, "==%s[%d]%s()==failed to create timer kq\n", basename(__FILE__), __LINE__, __func__);
// since no caller of this func checks the return value for the moment
abort();
}
r = pthread_create(&timer_thread, NULL, timer_routine, NULL);
if (r) {
fprintf(stderr, "==%s[%d]%s()==failed to create timer thread\n", basename(__FILE__), __LINE__, __func__);
// since no caller of this func checks the return value for the moment
abort();
}
return 0;
}
void taosUninitTimer() {
int r = 0;
timer_stop = 1;
r = pthread_join(timer_thread, NULL);
if (r) {
fprintf(stderr, "==%s[%d]%s()==failed to join timer thread\n", basename(__FILE__), __LINE__, __func__);
// since no caller of this func checks the return value for the moment
abort();
}
close(timer_kq);
timer_kq = -1;
}
void taos_block_sigalrm(void) {
// we don't know if there's any specific API for SIGALRM to deliver to specific thread
// this implementation relies on kqueue rather than SIGALRM
}
#else
int taosInitTimer(void (*callback)(int), int ms) { int taosInitTimer(void (*callback)(int), int ms) {
signal(SIGALRM, callback); signal(SIGALRM, callback);
...@@ -46,4 +119,5 @@ void taos_block_sigalrm(void) { ...@@ -46,4 +119,5 @@ void taos_block_sigalrm(void) {
already_set = 1; already_set = 1;
} }
} }
#endif
...@@ -13,13 +13,18 @@ ...@@ -13,13 +13,18 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
// fail-fast or let-it-crash philosophy
// https://en.wikipedia.org/wiki/Fail-fast
// https://stackoverflow.com/questions/4393197/erlangs-let-it-crash-philosophy-applicable-elsewhere
// experimentally, we follow log-and-crash here
#include "eok.h" #include "eok.h"
#include "os.h" #include "os.h"
#include <sys/event.h> #include <sys/event.h>
#define LET_IT_BE // #define BALANCE_CHECK_WHEN_CLOSE
#ifdef ENABLE_LOG #ifdef ENABLE_LOG
#define D(fmt, ...) fprintf(stderr, "%s[%d]%s(): " fmt "\n", basename(__FILE__), __LINE__, __func__, ##__VA_ARGS__) #define D(fmt, ...) fprintf(stderr, "%s[%d]%s(): " fmt "\n", basename(__FILE__), __LINE__, __func__, ##__VA_ARGS__)
...@@ -101,14 +106,14 @@ struct eoks_s { ...@@ -101,14 +106,14 @@ struct eoks_s {
pthread_mutex_t lock; pthread_mutex_t lock;
ep_over_kq_t **eoks; // note: this memory leaks when process terminates ep_over_kq_t **eoks; // note: this memory leaks when process terminates
int neoks; // we can add an extra api to let user clean int neoks; // we can add an extra api to let user clean
ep_over_kq_t *eoks_free; // currently, we just keep it simple stupid ep_over_kq_t *eoks_free_list; // currently, we just keep it simple stupid
}; };
static eoks_t eoks = { static eoks_t eoks = {
.lock = PTHREAD_MUTEX_INITIALIZER, .lock = PTHREAD_MUTEX_INITIALIZER,
.eoks = NULL, .eoks = NULL,
.neoks = 0, .neoks = 0,
.eoks_free = NULL, .eoks_free_list = NULL,
}; };
#ifdef ENABLE_LOG #ifdef ENABLE_LOG
...@@ -760,9 +765,9 @@ static ep_over_kq_t* eoks_alloc(void) { ...@@ -760,9 +765,9 @@ static ep_over_kq_t* eoks_alloc(void) {
A(0==pthread_mutex_lock(&eoks.lock), ""); A(0==pthread_mutex_lock(&eoks.lock), "");
do { do {
if (eoks.eoks_free) { if (eoks.eoks_free_list) {
eok = eoks.eoks_free; eok = eoks.eoks_free_list;
eoks.eoks_free = eok->next; eoks.eoks_free_list = eok->next;
A(eoks.eoks, "internal logic error"); A(eoks.eoks, "internal logic error");
A(eok->idx>=0 && eok->idx<eoks.neoks, "internal logic error"); A(eok->idx>=0 && eok->idx<eoks.neoks, "internal logic error");
A(*(eoks.eoks + eok->idx)==NULL, "internal logic error"); A(*(eoks.eoks + eok->idx)==NULL, "internal logic error");
...@@ -820,10 +825,12 @@ static void eoks_free(ep_over_kq_t *eok) { ...@@ -820,10 +825,12 @@ static void eoks_free(ep_over_kq_t *eok) {
A(eok->waiting==0, "internal logic error"); A(eok->waiting==0, "internal logic error");
eok_event_t *ev = eok->evs_head; eok_event_t *ev = eok->evs_head;
int sv_closed = 0;
while (ev) { while (ev) {
eok_event_t *next = ev->next; eok_event_t *next = ev->next;
if (ev->fd==eok->sv[0]) { if (ev->fd==eok->sv[0]) {
// fd is critical system resource // fd is critical system resource
A(sv_closed==0, "internal logic error");
close(eok->sv[0]); close(eok->sv[0]);
eok->sv[0] = -1; eok->sv[0] = -1;
close(eok->sv[1]); close(eok->sv[1]);
...@@ -832,11 +839,11 @@ static void eoks_free(ep_over_kq_t *eok) { ...@@ -832,11 +839,11 @@ static void eoks_free(ep_over_kq_t *eok) {
} else { } else {
// user forget calling epoll_ctl(EPOLL_CTL_DEL) before calling epoll_close/close? // user forget calling epoll_ctl(EPOLL_CTL_DEL) before calling epoll_close/close?
// calling close(ev->fd) here smells really bad // calling close(ev->fd) here smells really bad
#ifdef LET_IT_BE #ifndef BALANCE_CHECK_WHEN_CLOSE
// we just let it be and reclaim ev // we just let it be and reclaim ev
eok_free_ev(eok, ev); eok_free_ev(eok, ev);
#else #else
// panic otherwise, if LET_IT_BE not defined // panic otherwise, if BALANCE_CHECK_WHEN_CLOSE is defined
A(eok->evs_head==NULL && eok->evs_tail==NULL && eok->evs_count==0, A(eok->evs_head==NULL && eok->evs_tail==NULL && eok->evs_count==0,
"epfd[%d] fd[%d]: internal logic error: have you epoll_ctl(EPOLL_CTL_DEL) everything before calling epoll_close?", "epfd[%d] fd[%d]: internal logic error: have you epoll_ctl(EPOLL_CTL_DEL) everything before calling epoll_close?",
eok->idx, ev->fd); eok->idx, ev->fd);
...@@ -861,8 +868,8 @@ static void eoks_free(ep_over_kq_t *eok) { ...@@ -861,8 +868,8 @@ static void eoks_free(ep_over_kq_t *eok) {
close(eok->kq); close(eok->kq);
eok->kq = -1; eok->kq = -1;
} }
eok->next = eoks.eoks_free; eok->next = eoks.eoks_free_list;
eoks.eoks_free = eok; eoks.eoks_free_list = eok;
*(eoks.eoks + eok->idx) = NULL; *(eoks.eoks + eok->idx) = NULL;
} while (0); } while (0);
A(0==pthread_mutex_unlock(&eoks.lock), ""); A(0==pthread_mutex_unlock(&eoks.lock), "");
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册