/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #define ALLOW_FORBID_FUNC #define _DEFAULT_SOURCE #include "os.h" #include "pthread.h" #include "tdef.h" #ifdef WINDOWS /* * windows implementation */ #include bool taosCheckPthreadValid(TdThread thread) { return thread.p != NULL; } void taosResetPthread(TdThread* thread) { thread->p = 0; } int64_t taosGetPthreadId(TdThread thread) { #ifdef PTW32_VERSION return pthread_getw32threadid_np(thread); #else return (int64_t)thread; #endif } int64_t taosGetSelfPthreadId() { return GetCurrentThreadId(); } bool taosComparePthread(TdThread first, TdThread second) { return first.p == second.p; } int32_t taosGetPId() { return GetCurrentProcessId(); } int32_t taosGetAppName(char* name, int32_t* len) { char filepath[1024] = {0}; GetModuleFileName(NULL, filepath, MAX_PATH); char* sub = strrchr(filepath, '.'); if (sub != NULL) { *sub = '\0'; } char* end = strrchr(filepath, TD_DIRSEP[0]); if (end == NULL) { end = filepath; } tstrncpy(name, end, TSDB_APP_NAME_LEN); if (len != NULL) { *len = (int32_t)strlen(end); } return 0; } int32_t tsem_wait(tsem_t* sem) { int ret = 0; do { ret = sem_wait(sem); } while (ret != 0 && errno == EINTR); return ret; } int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { struct timespec ts, rel; FILETIME ft_before, ft_after; int rc; rel.tv_sec = 0; rel.tv_nsec = nanosecs; GetSystemTimeAsFileTime(&ft_before); // errno = 0; rc = sem_timedwait(sem, pthread_win32_getabstime_np(&ts, &rel)); /* This should have timed out */ // assert(errno == ETIMEDOUT); // assert(rc != 0); // GetSystemTimeAsFileTime(&ft_after); // // We specified a non-zero wait. Time must advance. // if (ft_before.dwLowDateTime == ft_after.dwLowDateTime && ft_before.dwHighDateTime == ft_after.dwHighDateTime) // { // printf("nanoseconds: %d, rc: %d, code:0x%x. before filetime: %d, %d; after filetime: %d, %d\n", // nanosecs, rc, errno, // (int)ft_before.dwLowDateTime, (int)ft_before.dwHighDateTime, // (int)ft_after.dwLowDateTime, (int)ft_after.dwHighDateTime); // printf("time must advance during sem_timedwait."); // return 1; // } return rc; } #elif defined(_TD_DARWIN_64) /* * darwin implementation */ #include // #define SEM_USE_PTHREAD // #define SEM_USE_POSIX // #define SEM_USE_SEM // #ifdef SEM_USE_SEM // #include // #include // #include // #include // static TdThread sem_thread; // static TdThreadOnce sem_once; // static task_t sem_port; // static volatile int sem_inited = 0; // static semaphore_t sem_exit; // static void *sem_thread_routine(void *arg) { // (void)arg; // setThreadName("sem_thrd"); // sem_port = mach_task_self(); // kern_return_t ret = semaphore_create(sem_port, &sem_exit, SYNC_POLICY_FIFO, 0); // if (ret != KERN_SUCCESS) { // fprintf(stderr, "==%s[%d]%s()==failed to create sem_exit\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__); // sem_inited = -1; // return NULL; // } // sem_inited = 1; // semaphore_wait(sem_exit); // return NULL; // } // static void once_init(void) { // int r = 0; // r = taosThreadCreate(&sem_thread, NULL, sem_thread_routine, NULL); // if (r) { // fprintf(stderr, "==%s[%d]%s()==failed to create thread\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__); // return; // } // while (sem_inited == 0) { // ; // } // } // #endif // struct tsem_s { // #ifdef SEM_USE_PTHREAD // TdThreadMutex lock; // TdThreadCond cond; // volatile int64_t val; // #elif defined(SEM_USE_POSIX) // size_t id; // sem_t *sem; // #elif defined(SEM_USE_SEM) // semaphore_t sem; // #else // SEM_USE_PTHREAD // dispatch_semaphore_t sem; // #endif // SEM_USE_PTHREAD // volatile unsigned int valid : 1; // }; // int tsem_init(tsem_t *sem, int pshared, unsigned int value) { // // fprintf(stderr, "==%s[%d]%s():[%p]==creating\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); // if (*sem) { // fprintf(stderr, "==%s[%d]%s():[%p]==already initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // sem); // abort(); // } // struct tsem_s *p = (struct tsem_s *)taosMemoryCalloc(1, sizeof(*p)); // if (!p) { // fprintf(stderr, "==%s[%d]%s():[%p]==out of memory\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); // abort(); // } // #ifdef SEM_USE_PTHREAD // int r = taosThreadMutexInit(&p->lock, NULL); // do { // if (r) break; // r = taosThreadCondInit(&p->cond, NULL); // if (r) { // taosThreadMutexDestroy(&p->lock); // break; // } // p->val = value; // } while (0); // if (r) { // fprintf(stderr, "==%s[%d]%s():[%p]==not created\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); // abort(); // } // #elif defined(SEM_USE_POSIX) // static size_t tick = 0; // do { // size_t id = atomic_add_fetch_64(&tick, 1); // if (id == SEM_VALUE_MAX) { // atomic_store_64(&tick, 0); // id = 0; // } // char name[NAME_MAX - 4]; // snprintf(name, sizeof(name), "/t" PRId64, id); // p->sem = sem_open(name, O_CREAT | O_EXCL, pshared, value); // p->id = id; // if (p->sem != SEM_FAILED) break; // int e = errno; // if (e == EEXIST) continue; // if (e == EINTR) continue; // fprintf(stderr, "==%s[%d]%s():[%p]==not created[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // sem, // e, strerror(e)); // abort(); // } while (p->sem == SEM_FAILED); // #elif defined(SEM_USE_SEM) // taosThreadOnce(&sem_once, once_init); // if (sem_inited != 1) { // fprintf(stderr, "==%s[%d]%s():[%p]==internal resource init failed\n", taosDirEntryBaseName(__FILE__), __LINE__, // __func__, sem); // errno = ENOMEM; // return -1; // } // kern_return_t ret = semaphore_create(sem_port, &p->sem, SYNC_POLICY_FIFO, value); // if (ret != KERN_SUCCESS) { // fprintf(stderr, "==%s[%d]%s():[%p]==semophore_create failed\n", taosDirEntryBaseName(__FILE__), __LINE__, // __func__, // sem); // // we fail-fast here, because we have less-doc about semaphore_create for the moment // abort(); // } // #else // SEM_USE_PTHREAD // p->sem = dispatch_semaphore_create(value); // if (p->sem == NULL) { // fprintf(stderr, "==%s[%d]%s():[%p]==not created\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); // abort(); // } // #endif // SEM_USE_PTHREAD // p->valid = 1; // *sem = p; // return 0; // } // int tsem_wait(tsem_t *sem) { // if (!*sem) { // fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); // abort(); // } // struct tsem_s *p = *sem; // if (!p->valid) { // fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // sem); abort(); // } // #ifdef SEM_USE_PTHREAD // if (taosThreadMutexLock(&p->lock)) { // fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // sem); // abort(); // } // p->val -= 1; // if (p->val < 0) { // if (taosThreadCondWait(&p->cond, &p->lock)) { // fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, // __func__, // sem); // abort(); // } // } // if (taosThreadMutexUnlock(&p->lock)) { // fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // sem); // abort(); // } // return 0; // #elif defined(SEM_USE_POSIX) // return sem_wait(p->sem); // #elif defined(SEM_USE_SEM) // return semaphore_wait(p->sem); // #else // SEM_USE_PTHREAD // return dispatch_semaphore_wait(p->sem, DISPATCH_TIME_FOREVER); // #endif // SEM_USE_PTHREAD // } // int tsem_post(tsem_t *sem) { // if (!*sem) { // fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); // abort(); // } // struct tsem_s *p = *sem; // if (!p->valid) { // fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // sem); abort(); // } // #ifdef SEM_USE_PTHREAD // if (taosThreadMutexLock(&p->lock)) { // fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // sem); // abort(); // } // p->val += 1; // if (p->val <= 0) { // if (taosThreadCondSignal(&p->cond)) { // fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, // __func__, // sem); // abort(); // } // } // if (taosThreadMutexUnlock(&p->lock)) { // fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // sem); // abort(); // } // return 0; // #elif defined(SEM_USE_POSIX) // return sem_post(p->sem); // #elif defined(SEM_USE_SEM) // return semaphore_signal(p->sem); // #else // SEM_USE_PTHREAD // return dispatch_semaphore_signal(p->sem); // #endif // SEM_USE_PTHREAD // } // int tsem_destroy(tsem_t *sem) { // // fprintf(stderr, "==%s[%d]%s():[%p]==destroying\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); // if (!*sem) { // // fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // sem); // // abort(); // return 0; // } // struct tsem_s *p = *sem; // if (!p->valid) { // // fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // // sem); abort(); // return 0; // } // #ifdef SEM_USE_PTHREAD // if (taosThreadMutexLock(&p->lock)) { // fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // sem); // abort(); // } // p->valid = 0; // if (taosThreadCondDestroy(&p->cond)) { // fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // sem); // abort(); // } // if (taosThreadMutexUnlock(&p->lock)) { // fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // sem); // abort(); // } // if (taosThreadMutexDestroy(&p->lock)) { // fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // sem); // abort(); // } // #elif defined(SEM_USE_POSIX) // char name[NAME_MAX - 4]; // snprintf(name, sizeof(name), "/t" PRId64, p->id); // int r = sem_unlink(name); // if (r) { // int e = errno; // fprintf(stderr, "==%s[%d]%s():[%p]==unlink failed[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // sem, // e, strerror(e)); // abort(); // } // #elif defined(SEM_USE_SEM) // semaphore_destroy(sem_port, p->sem); // #else // SEM_USE_PTHREAD // #endif // SEM_USE_PTHREAD // p->valid = 0; // taosMemoryFree(p); // *sem = NULL; // return 0; // } int tsem_init(tsem_t *psem, int flags, unsigned int count) { *psem = dispatch_semaphore_create(count); if (*psem == NULL) return -1; return 0; } int tsem_destroy(tsem_t *psem) { if (psem == NULL || *psem == NULL) return -1; dispatch_release(*psem); *psem = NULL; return 0; } int tsem_post(tsem_t *psem) { if (psem == NULL || *psem == NULL) return -1; dispatch_semaphore_signal(*psem); return 0; } int tsem_wait(tsem_t *psem) { if (psem == NULL || *psem == NULL) return -1; dispatch_semaphore_wait(*psem, DISPATCH_TIME_FOREVER); return 0; } int tsem_timewait(tsem_t *psem, int64_t nanosecs) { if (psem == NULL || *psem == NULL) return -1; dispatch_semaphore_wait(*psem, nanosecs); return 0; } bool taosCheckPthreadValid(TdThread thread) { return thread != 0; } int64_t taosGetSelfPthreadId() { TdThread thread = taosThreadSelf(); return (int64_t)thread; } int64_t taosGetPthreadId(TdThread thread) { return (int64_t)thread; } void taosResetPthread(TdThread *thread) { *thread = NULL; } bool taosComparePthread(TdThread first, TdThread second) { return taosThreadEqual(first, second) ? true : false; } int32_t taosGetPId() { return (int32_t)getpid(); } int32_t taosGetAppName(char *name, int32_t *len) { char buf[PATH_MAX + 1]; buf[0] = '\0'; proc_name(getpid(), buf, sizeof(buf) - 1); buf[PATH_MAX] = '\0'; size_t n = strlen(buf); if (len) *len = n; if (name) tstrncpy(name, buf, TSDB_APP_NAME_LEN); return 0; } #else /* * linux implementation */ #include #include bool taosCheckPthreadValid(TdThread thread) { return thread != 0; } int64_t taosGetSelfPthreadId() { static __thread int id = 0; if (id != 0) return id; id = syscall(SYS_gettid); return id; } int64_t taosGetPthreadId(TdThread thread) { return (int64_t)thread; } void taosResetPthread(TdThread* thread) { *thread = 0; } bool taosComparePthread(TdThread first, TdThread second) { return first == second; } int32_t taosGetPId() { static int32_t pid; if (pid != 0) return pid; pid = getpid(); return pid; } int32_t taosGetAppName(char* name, int32_t* len) { const char* self = "/proc/self/exe"; char path[PATH_MAX] = {0}; if (readlink(self, path, PATH_MAX) <= 0) { return -1; } path[PATH_MAX - 1] = 0; char* end = strrchr(path, '/'); if (end == NULL) { return -1; } ++end; tstrncpy(name, end, TSDB_APP_NAME_LEN); if (len != NULL) { *len = strlen(name); } return 0; } int32_t tsem_wait(tsem_t* sem) { int ret = 0; do { ret = sem_wait(sem); } while (ret != 0 && errno == EINTR); return ret; } int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { int ret = 0; struct timespec tv = { .tv_sec = 0, .tv_nsec = nanosecs, }; while ((ret = sem_timedwait(sem, &tv)) == -1 && errno == EINTR) continue; return ret; } #endif