未验证 提交 c02f1e08 编写于 作者: wafwerar's avatar wafwerar 提交者: GitHub

Merge pull request #16420 from taosdata/fix/ZhiqiangWang/TD-13064-fix-mac-semaphore-error

os: fix Mac Semaphore error
...@@ -2,8 +2,6 @@ cmake_minimum_required(VERSION 3.0) ...@@ -2,8 +2,6 @@ cmake_minimum_required(VERSION 3.0)
set(CMAKE_VERBOSE_MAKEFILE OFF) set(CMAKE_VERBOSE_MAKEFILE OFF)
SET(BUILD_SHARED_LIBS "OFF")
#set output directory #set output directory
SET(LIBRARY_OUTPUT_PATH ${PROJECT_BINARY_DIR}/build/lib) SET(LIBRARY_OUTPUT_PATH ${PROJECT_BINARY_DIR}/build/lib)
SET(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/build/bin) SET(EXECUTABLE_OUTPUT_PATH ${PROJECT_BINARY_DIR}/build/bin)
......
SET(PREPARE_ENV_CMD "prepare_env_cmd")
SET(PREPARE_ENV_TARGET "prepare_env_target")
ADD_CUSTOM_COMMAND(OUTPUT ${PREPARE_ENV_CMD}
POST_BUILD
COMMAND echo "make test directory"
DEPENDS taosd
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/cfg/
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/log/
COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/data/
COMMAND ${CMAKE_COMMAND} -E echo dataDir ${TD_TESTS_OUTPUT_DIR}/data > ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo logDir ${TD_TESTS_OUTPUT_DIR}/log >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo charset UTF-8 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMAND ${CMAKE_COMMAND} -E echo monitor 0 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
COMMENT "prepare taosd environment")
ADD_CUSTOM_TARGET(${PREPARE_ENV_TARGET} ALL WORKING_DIRECTORY ${TD_EXECUTABLE_OUTPUT_PATH} DEPENDS ${PREPARE_ENV_CMD})
IF (TD_LINUX) IF (TD_LINUX)
SET(TD_MAKE_INSTALL_SH "${TD_SOURCE_DIR}/packaging/tools/make_install.sh") SET(TD_MAKE_INSTALL_SH "${TD_SOURCE_DIR}/packaging/tools/make_install.sh")
INSTALL(CODE "MESSAGE(\"make install script: ${TD_MAKE_INSTALL_SH}\")") INSTALL(CODE "MESSAGE(\"make install script: ${TD_MAKE_INSTALL_SH}\")")
......
...@@ -90,6 +90,12 @@ ELSE () ...@@ -90,6 +90,12 @@ ELSE ()
ENDIF () ENDIF ()
ENDIF () ENDIF ()
option(
BUILD_SHARED_LIBS
""
OFF
)
option( option(
RUST_BINDINGS RUST_BINDINGS
"If build with rust-bindings" "If build with rust-bindings"
......
/* /*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com> * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
* *
* This program is free software: you can use, redistribute, and/or modify * This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3 * it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation. * or later ("AGPL"), as published by the Free Software Foundation.
* *
* This program is distributed in the hope that it will be useful, but WITHOUT * This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. * FITNESS FOR A PARTICULAR PURPOSE.
* *
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TD_OS_SEMPHONE_H_ #ifndef _TD_OS_SEMPHONE_H_
#define _TD_OS_SEMPHONE_H_ #define _TD_OS_SEMPHONE_H_
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include <semaphore.h> #include <semaphore.h>
#if defined(_TD_DARWIN_64) #if defined(_TD_DARWIN_64)
#include <dispatch/dispatch.h>
// typedef struct tsem_s *tsem_t; // typedef struct tsem_s *tsem_t;
typedef struct bosal_sem_t *tsem_t; typedef dispatch_semaphore_t tsem_t;
int tsem_init(tsem_t *sem, int pshared, unsigned int value);
int tsem_init(tsem_t *sem, int pshared, unsigned int value); int tsem_wait(tsem_t *sem);
int tsem_wait(tsem_t *sem); int tsem_timewait(tsem_t *sim, int64_t nanosecs);
int tsem_timewait(tsem_t *sim, int64_t nanosecs); int tsem_post(tsem_t *sem);
int tsem_post(tsem_t *sem); int tsem_destroy(tsem_t *sem);
int tsem_destroy(tsem_t *sem);
#else
#else
#define tsem_t sem_t
#define tsem_t sem_t #define tsem_init sem_init
#define tsem_init sem_init int tsem_wait(tsem_t *sem);
int tsem_wait(tsem_t *sem); int tsem_timewait(tsem_t *sim, int64_t nanosecs);
int tsem_timewait(tsem_t *sim, int64_t nanosecs); #define tsem_post sem_post
#define tsem_post sem_post #define tsem_destroy sem_destroy
#define tsem_destroy sem_destroy
#endif
#endif
#if defined(_TD_DARWIN_64)
#if defined(_TD_DARWIN_64) // #define TdThreadRwlock TdThreadMutex
// #define TdThreadRwlock TdThreadMutex // #define taosThreadRwlockInit(lock, NULL) taosThreadMutexInit(lock, NULL)
// #define taosThreadRwlockInit(lock, NULL) taosThreadMutexInit(lock, NULL) // #define taosThreadRwlockDestroy(lock) taosThreadMutexDestroy(lock)
// #define taosThreadRwlockDestroy(lock) taosThreadMutexDestroy(lock) // #define taosThreadRwlockWrlock(lock) taosThreadMutexLock(lock)
// #define taosThreadRwlockWrlock(lock) taosThreadMutexLock(lock) // #define taosThreadRwlockRdlock(lock) taosThreadMutexLock(lock)
// #define taosThreadRwlockRdlock(lock) taosThreadMutexLock(lock) // #define taosThreadRwlockUnlock(lock) taosThreadMutexUnlock(lock)
// #define taosThreadRwlockUnlock(lock) taosThreadMutexUnlock(lock)
// #define TdThreadSpinlock TdThreadMutex
// #define TdThreadSpinlock TdThreadMutex // #define taosThreadSpinInit(lock, NULL) taosThreadMutexInit(lock, NULL)
// #define taosThreadSpinInit(lock, NULL) taosThreadMutexInit(lock, NULL) // #define taosThreadSpinDestroy(lock) taosThreadMutexDestroy(lock)
// #define taosThreadSpinDestroy(lock) taosThreadMutexDestroy(lock) // #define taosThreadSpinLock(lock) taosThreadMutexLock(lock)
// #define taosThreadSpinLock(lock) taosThreadMutexLock(lock) // #define taosThreadSpinUnlock(lock) taosThreadMutexUnlock(lock)
// #define taosThreadSpinUnlock(lock) taosThreadMutexUnlock(lock) #endif
#endif
bool taosCheckPthreadValid(TdThread thread);
bool taosCheckPthreadValid(TdThread thread); int64_t taosGetSelfPthreadId();
int64_t taosGetSelfPthreadId(); int64_t taosGetPthreadId(TdThread thread);
int64_t taosGetPthreadId(TdThread thread); void taosResetPthread(TdThread *thread);
void taosResetPthread(TdThread *thread); bool taosComparePthread(TdThread first, TdThread second);
bool taosComparePthread(TdThread first, TdThread second); int32_t taosGetPId();
int32_t taosGetPId(); int32_t taosGetAppName(char *name, int32_t *len);
int32_t taosGetAppName(char *name, int32_t *len);
#ifdef __cplusplus
#ifdef __cplusplus }
} #endif
#endif
#endif /*_TD_OS_SEMPHONE_H_*/
#endif /*_TD_OS_SEMPHONE_H_*/
/* /*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com> * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
* *
* This program is free software: you can use, redistribute, and/or modify * This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3 * it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation. * or later ("AGPL"), as published by the Free Software Foundation.
* *
* This program is distributed in the hope that it will be useful, but WITHOUT * This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. * FITNESS FOR A PARTICULAR PURPOSE.
* *
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#define ALLOW_FORBID_FUNC #define ALLOW_FORBID_FUNC
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "pthread.h" #include "pthread.h"
#include "tdef.h" #include "tdef.h"
#ifdef WINDOWS #ifdef WINDOWS
/* /*
* windows implementation * windows implementation
*/ */
#include <windows.h> #include <windows.h>
bool taosCheckPthreadValid(TdThread thread) { return thread.p != NULL; } bool taosCheckPthreadValid(TdThread thread) { return thread.p != NULL; }
void taosResetPthread(TdThread* thread) { thread->p = 0; } void taosResetPthread(TdThread* thread) { thread->p = 0; }
int64_t taosGetPthreadId(TdThread thread) { int64_t taosGetPthreadId(TdThread thread) {
#ifdef PTW32_VERSION #ifdef PTW32_VERSION
return pthread_getw32threadid_np(thread); return pthread_getw32threadid_np(thread);
#else #else
return (int64_t)thread; return (int64_t)thread;
#endif #endif
} }
int64_t taosGetSelfPthreadId() { return GetCurrentThreadId(); } int64_t taosGetSelfPthreadId() { return GetCurrentThreadId(); }
bool taosComparePthread(TdThread first, TdThread second) { return first.p == second.p; } bool taosComparePthread(TdThread first, TdThread second) { return first.p == second.p; }
int32_t taosGetPId() { return GetCurrentProcessId(); } int32_t taosGetPId() { return GetCurrentProcessId(); }
int32_t taosGetAppName(char* name, int32_t* len) { int32_t taosGetAppName(char* name, int32_t* len) {
char filepath[1024] = {0}; char filepath[1024] = {0};
GetModuleFileName(NULL, filepath, MAX_PATH); GetModuleFileName(NULL, filepath, MAX_PATH);
char* sub = strrchr(filepath, '.'); char* sub = strrchr(filepath, '.');
if (sub != NULL) { if (sub != NULL) {
*sub = '\0'; *sub = '\0';
} }
char* end = strrchr(filepath, TD_DIRSEP[0]); char* end = strrchr(filepath, TD_DIRSEP[0]);
if (end == NULL) { if (end == NULL) {
end = filepath; end = filepath;
} }
tstrncpy(name, end, TSDB_APP_NAME_LEN); tstrncpy(name, end, TSDB_APP_NAME_LEN);
if (len != NULL) { if (len != NULL) {
*len = (int32_t)strlen(end); *len = (int32_t)strlen(end);
} }
return 0; return 0;
} }
int32_t tsem_wait(tsem_t* sem) { int32_t tsem_wait(tsem_t* sem) {
int ret = 0; int ret = 0;
do { do {
ret = sem_wait(sem); ret = sem_wait(sem);
} while (ret != 0 && errno == EINTR); } while (ret != 0 && errno == EINTR);
return ret; return ret;
} }
int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
struct timespec ts, rel; struct timespec ts, rel;
FILETIME ft_before, ft_after; FILETIME ft_before, ft_after;
int rc; int rc;
rel.tv_sec = 0; rel.tv_sec = 0;
rel.tv_nsec = nanosecs; rel.tv_nsec = nanosecs;
GetSystemTimeAsFileTime(&ft_before); GetSystemTimeAsFileTime(&ft_before);
// errno = 0; // errno = 0;
rc = sem_timedwait(sem, pthread_win32_getabstime_np(&ts, &rel)); rc = sem_timedwait(sem, pthread_win32_getabstime_np(&ts, &rel));
/* This should have timed out */ /* This should have timed out */
// assert(errno == ETIMEDOUT); // assert(errno == ETIMEDOUT);
// assert(rc != 0); // assert(rc != 0);
// GetSystemTimeAsFileTime(&ft_after); // GetSystemTimeAsFileTime(&ft_after);
// // We specified a non-zero wait. Time must advance. // // We specified a non-zero wait. Time must advance.
// if (ft_before.dwLowDateTime == ft_after.dwLowDateTime && ft_before.dwHighDateTime == ft_after.dwHighDateTime) // if (ft_before.dwLowDateTime == ft_after.dwLowDateTime && ft_before.dwHighDateTime == ft_after.dwHighDateTime)
// { // {
// printf("nanoseconds: %d, rc: %d, code:0x%x. before filetime: %d, %d; after filetime: %d, %d\n", // printf("nanoseconds: %d, rc: %d, code:0x%x. before filetime: %d, %d; after filetime: %d, %d\n",
// nanosecs, rc, errno, // nanosecs, rc, errno,
// (int)ft_before.dwLowDateTime, (int)ft_before.dwHighDateTime, // (int)ft_before.dwLowDateTime, (int)ft_before.dwHighDateTime,
// (int)ft_after.dwLowDateTime, (int)ft_after.dwHighDateTime); // (int)ft_after.dwLowDateTime, (int)ft_after.dwHighDateTime);
// printf("time must advance during sem_timedwait."); // printf("time must advance during sem_timedwait.");
// return 1; // return 1;
// } // }
return rc; return rc;
} }
#elif defined(_TD_DARWIN_64) #elif defined(_TD_DARWIN_64)
/* /*
* darwin implementation * darwin implementation
*/ */
#include <libproc.h> #include <libproc.h>
// #define SEM_USE_PTHREAD // #define SEM_USE_PTHREAD
// #define SEM_USE_POSIX // #define SEM_USE_POSIX
// #define SEM_USE_SEM // #define SEM_USE_SEM
// #ifdef SEM_USE_SEM // #ifdef SEM_USE_SEM
// #include <mach/mach_error.h> // #include <mach/mach_error.h>
// #include <mach/mach_init.h> // #include <mach/mach_init.h>
// #include <mach/semaphore.h> // #include <mach/semaphore.h>
// #include <mach/task.h> // #include <mach/task.h>
// static TdThread sem_thread; // static TdThread sem_thread;
// static TdThreadOnce sem_once; // static TdThreadOnce sem_once;
// static task_t sem_port; // static task_t sem_port;
// static volatile int sem_inited = 0; // static volatile int sem_inited = 0;
// static semaphore_t sem_exit; // static semaphore_t sem_exit;
// static void *sem_thread_routine(void *arg) { // static void *sem_thread_routine(void *arg) {
// (void)arg; // (void)arg;
// setThreadName("sem_thrd"); // setThreadName("sem_thrd");
// sem_port = mach_task_self(); // sem_port = mach_task_self();
// kern_return_t ret = semaphore_create(sem_port, &sem_exit, SYNC_POLICY_FIFO, 0); // kern_return_t ret = semaphore_create(sem_port, &sem_exit, SYNC_POLICY_FIFO, 0);
// if (ret != KERN_SUCCESS) { // if (ret != KERN_SUCCESS) {
// fprintf(stderr, "==%s[%d]%s()==failed to create sem_exit\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__); // fprintf(stderr, "==%s[%d]%s()==failed to create sem_exit\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__);
// sem_inited = -1; // sem_inited = -1;
// return NULL; // return NULL;
// } // }
// sem_inited = 1; // sem_inited = 1;
// semaphore_wait(sem_exit); // semaphore_wait(sem_exit);
// return NULL; // return NULL;
// } // }
// static void once_init(void) { // static void once_init(void) {
// int r = 0; // int r = 0;
// r = taosThreadCreate(&sem_thread, NULL, sem_thread_routine, NULL); // r = taosThreadCreate(&sem_thread, NULL, sem_thread_routine, NULL);
// if (r) { // if (r) {
// fprintf(stderr, "==%s[%d]%s()==failed to create thread\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__); // fprintf(stderr, "==%s[%d]%s()==failed to create thread\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__);
// return; // return;
// } // }
// while (sem_inited == 0) { // while (sem_inited == 0) {
// ; // ;
// } // }
// } // }
// #endif // #endif
// struct tsem_s { // struct tsem_s {
// #ifdef SEM_USE_PTHREAD // #ifdef SEM_USE_PTHREAD
// TdThreadMutex lock; // TdThreadMutex lock;
// TdThreadCond cond; // TdThreadCond cond;
// volatile int64_t val; // volatile int64_t val;
// #elif defined(SEM_USE_POSIX) // #elif defined(SEM_USE_POSIX)
// size_t id; // size_t id;
// sem_t *sem; // sem_t *sem;
// #elif defined(SEM_USE_SEM) // #elif defined(SEM_USE_SEM)
// semaphore_t sem; // semaphore_t sem;
// #else // SEM_USE_PTHREAD // #else // SEM_USE_PTHREAD
// dispatch_semaphore_t sem; // dispatch_semaphore_t sem;
// #endif // SEM_USE_PTHREAD // #endif // SEM_USE_PTHREAD
// volatile unsigned int valid : 1; // volatile unsigned int valid : 1;
// }; // };
// int tsem_init(tsem_t *sem, int pshared, unsigned int value) { // int tsem_init(tsem_t *sem, int pshared, unsigned int value) {
// // fprintf(stderr, "==%s[%d]%s():[%p]==creating\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); // // fprintf(stderr, "==%s[%d]%s():[%p]==creating\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// if (*sem) { // if (*sem) {
// fprintf(stderr, "==%s[%d]%s():[%p]==already initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // fprintf(stderr, "==%s[%d]%s():[%p]==already initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem); // sem);
// abort(); // abort();
// } // }
// struct tsem_s *p = (struct tsem_s *)taosMemoryCalloc(1, sizeof(*p)); // struct tsem_s *p = (struct tsem_s *)taosMemoryCalloc(1, sizeof(*p));
// if (!p) { // if (!p) {
// fprintf(stderr, "==%s[%d]%s():[%p]==out of memory\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); // fprintf(stderr, "==%s[%d]%s():[%p]==out of memory\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// abort(); // abort();
// } // }
// #ifdef SEM_USE_PTHREAD // #ifdef SEM_USE_PTHREAD
// int r = taosThreadMutexInit(&p->lock, NULL); // int r = taosThreadMutexInit(&p->lock, NULL);
// do { // do {
// if (r) break; // if (r) break;
// r = taosThreadCondInit(&p->cond, NULL); // r = taosThreadCondInit(&p->cond, NULL);
// if (r) { // if (r) {
// taosThreadMutexDestroy(&p->lock); // taosThreadMutexDestroy(&p->lock);
// break; // break;
// } // }
// p->val = value; // p->val = value;
// } while (0); // } while (0);
// if (r) { // if (r) {
// fprintf(stderr, "==%s[%d]%s():[%p]==not created\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); // fprintf(stderr, "==%s[%d]%s():[%p]==not created\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// abort(); // abort();
// } // }
// #elif defined(SEM_USE_POSIX) // #elif defined(SEM_USE_POSIX)
// static size_t tick = 0; // static size_t tick = 0;
// do { // do {
// size_t id = atomic_add_fetch_64(&tick, 1); // size_t id = atomic_add_fetch_64(&tick, 1);
// if (id == SEM_VALUE_MAX) { // if (id == SEM_VALUE_MAX) {
// atomic_store_64(&tick, 0); // atomic_store_64(&tick, 0);
// id = 0; // id = 0;
// } // }
// char name[NAME_MAX - 4]; // char name[NAME_MAX - 4];
// snprintf(name, sizeof(name), "/t" PRId64, id); // snprintf(name, sizeof(name), "/t" PRId64, id);
// p->sem = sem_open(name, O_CREAT | O_EXCL, pshared, value); // p->sem = sem_open(name, O_CREAT | O_EXCL, pshared, value);
// p->id = id; // p->id = id;
// if (p->sem != SEM_FAILED) break; // if (p->sem != SEM_FAILED) break;
// int e = errno; // int e = errno;
// if (e == EEXIST) continue; // if (e == EEXIST) continue;
// if (e == EINTR) continue; // if (e == EINTR) continue;
// fprintf(stderr, "==%s[%d]%s():[%p]==not created[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // fprintf(stderr, "==%s[%d]%s():[%p]==not created[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem, // sem,
// e, strerror(e)); // e, strerror(e));
// abort(); // abort();
// } while (p->sem == SEM_FAILED); // } while (p->sem == SEM_FAILED);
// #elif defined(SEM_USE_SEM) // #elif defined(SEM_USE_SEM)
// taosThreadOnce(&sem_once, once_init); // taosThreadOnce(&sem_once, once_init);
// if (sem_inited != 1) { // if (sem_inited != 1) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal resource init failed\n", taosDirEntryBaseName(__FILE__), __LINE__, // fprintf(stderr, "==%s[%d]%s():[%p]==internal resource init failed\n", taosDirEntryBaseName(__FILE__), __LINE__,
// __func__, sem); // __func__, sem);
// errno = ENOMEM; // errno = ENOMEM;
// return -1; // return -1;
// } // }
// kern_return_t ret = semaphore_create(sem_port, &p->sem, SYNC_POLICY_FIFO, value); // kern_return_t ret = semaphore_create(sem_port, &p->sem, SYNC_POLICY_FIFO, value);
// if (ret != KERN_SUCCESS) { // if (ret != KERN_SUCCESS) {
// fprintf(stderr, "==%s[%d]%s():[%p]==semophore_create failed\n", taosDirEntryBaseName(__FILE__), __LINE__, // fprintf(stderr, "==%s[%d]%s():[%p]==semophore_create failed\n", taosDirEntryBaseName(__FILE__), __LINE__,
// __func__, // __func__,
// sem); // sem);
// // we fail-fast here, because we have less-doc about semaphore_create for the moment // // we fail-fast here, because we have less-doc about semaphore_create for the moment
// abort(); // abort();
// } // }
// #else // SEM_USE_PTHREAD // #else // SEM_USE_PTHREAD
// p->sem = dispatch_semaphore_create(value); // p->sem = dispatch_semaphore_create(value);
// if (p->sem == NULL) { // if (p->sem == NULL) {
// fprintf(stderr, "==%s[%d]%s():[%p]==not created\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); // fprintf(stderr, "==%s[%d]%s():[%p]==not created\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// abort(); // abort();
// } // }
// #endif // SEM_USE_PTHREAD // #endif // SEM_USE_PTHREAD
// p->valid = 1; // p->valid = 1;
// *sem = p; // *sem = p;
// return 0; // return 0;
// } // }
// int tsem_wait(tsem_t *sem) { // int tsem_wait(tsem_t *sem) {
// if (!*sem) { // if (!*sem) {
// fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); // fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// abort(); // abort();
// } // }
// struct tsem_s *p = *sem; // struct tsem_s *p = *sem;
// if (!p->valid) { // if (!p->valid) {
// fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem); abort(); // sem); abort();
// } // }
// #ifdef SEM_USE_PTHREAD // #ifdef SEM_USE_PTHREAD
// if (taosThreadMutexLock(&p->lock)) { // if (taosThreadMutexLock(&p->lock)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem); // sem);
// abort(); // abort();
// } // }
// p->val -= 1; // p->val -= 1;
// if (p->val < 0) { // if (p->val < 0) {
// if (taosThreadCondWait(&p->cond, &p->lock)) { // if (taosThreadCondWait(&p->cond, &p->lock)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, // fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__,
// __func__, // __func__,
// sem); // sem);
// abort(); // abort();
// } // }
// } // }
// if (taosThreadMutexUnlock(&p->lock)) { // if (taosThreadMutexUnlock(&p->lock)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem); // sem);
// abort(); // abort();
// } // }
// return 0; // return 0;
// #elif defined(SEM_USE_POSIX) // #elif defined(SEM_USE_POSIX)
// return sem_wait(p->sem); // return sem_wait(p->sem);
// #elif defined(SEM_USE_SEM) // #elif defined(SEM_USE_SEM)
// return semaphore_wait(p->sem); // return semaphore_wait(p->sem);
// #else // SEM_USE_PTHREAD // #else // SEM_USE_PTHREAD
// return dispatch_semaphore_wait(p->sem, DISPATCH_TIME_FOREVER); // return dispatch_semaphore_wait(p->sem, DISPATCH_TIME_FOREVER);
// #endif // SEM_USE_PTHREAD // #endif // SEM_USE_PTHREAD
// } // }
// int tsem_post(tsem_t *sem) { // int tsem_post(tsem_t *sem) {
// if (!*sem) { // if (!*sem) {
// fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); // fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// abort(); // abort();
// } // }
// struct tsem_s *p = *sem; // struct tsem_s *p = *sem;
// if (!p->valid) { // if (!p->valid) {
// fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem); abort(); // sem); abort();
// } // }
// #ifdef SEM_USE_PTHREAD // #ifdef SEM_USE_PTHREAD
// if (taosThreadMutexLock(&p->lock)) { // if (taosThreadMutexLock(&p->lock)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem); // sem);
// abort(); // abort();
// } // }
// p->val += 1; // p->val += 1;
// if (p->val <= 0) { // if (p->val <= 0) {
// if (taosThreadCondSignal(&p->cond)) { // if (taosThreadCondSignal(&p->cond)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, // fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__,
// __func__, // __func__,
// sem); // sem);
// abort(); // abort();
// } // }
// } // }
// if (taosThreadMutexUnlock(&p->lock)) { // if (taosThreadMutexUnlock(&p->lock)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem); // sem);
// abort(); // abort();
// } // }
// return 0; // return 0;
// #elif defined(SEM_USE_POSIX) // #elif defined(SEM_USE_POSIX)
// return sem_post(p->sem); // return sem_post(p->sem);
// #elif defined(SEM_USE_SEM) // #elif defined(SEM_USE_SEM)
// return semaphore_signal(p->sem); // return semaphore_signal(p->sem);
// #else // SEM_USE_PTHREAD // #else // SEM_USE_PTHREAD
// return dispatch_semaphore_signal(p->sem); // return dispatch_semaphore_signal(p->sem);
// #endif // SEM_USE_PTHREAD // #endif // SEM_USE_PTHREAD
// } // }
// int tsem_destroy(tsem_t *sem) { // int tsem_destroy(tsem_t *sem) {
// // fprintf(stderr, "==%s[%d]%s():[%p]==destroying\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); // // fprintf(stderr, "==%s[%d]%s():[%p]==destroying\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// if (!*sem) { // if (!*sem) {
// // fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // // fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem); // sem);
// // abort(); // // abort();
// return 0; // return 0;
// } // }
// struct tsem_s *p = *sem; // struct tsem_s *p = *sem;
// if (!p->valid) { // if (!p->valid) {
// // fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // // fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// // sem); abort(); // // sem); abort();
// return 0; // return 0;
// } // }
// #ifdef SEM_USE_PTHREAD // #ifdef SEM_USE_PTHREAD
// if (taosThreadMutexLock(&p->lock)) { // if (taosThreadMutexLock(&p->lock)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem); // sem);
// abort(); // abort();
// } // }
// p->valid = 0; // p->valid = 0;
// if (taosThreadCondDestroy(&p->cond)) { // if (taosThreadCondDestroy(&p->cond)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem); // sem);
// abort(); // abort();
// } // }
// if (taosThreadMutexUnlock(&p->lock)) { // if (taosThreadMutexUnlock(&p->lock)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem); // sem);
// abort(); // abort();
// } // }
// if (taosThreadMutexDestroy(&p->lock)) { // if (taosThreadMutexDestroy(&p->lock)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem); // sem);
// abort(); // abort();
// } // }
// #elif defined(SEM_USE_POSIX) // #elif defined(SEM_USE_POSIX)
// char name[NAME_MAX - 4]; // char name[NAME_MAX - 4];
// snprintf(name, sizeof(name), "/t" PRId64, p->id); // snprintf(name, sizeof(name), "/t" PRId64, p->id);
// int r = sem_unlink(name); // int r = sem_unlink(name);
// if (r) { // if (r) {
// int e = errno; // int e = errno;
// fprintf(stderr, "==%s[%d]%s():[%p]==unlink failed[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, // fprintf(stderr, "==%s[%d]%s():[%p]==unlink failed[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem, // sem,
// e, strerror(e)); // e, strerror(e));
// abort(); // abort();
// } // }
// #elif defined(SEM_USE_SEM) // #elif defined(SEM_USE_SEM)
// semaphore_destroy(sem_port, p->sem); // semaphore_destroy(sem_port, p->sem);
// #else // SEM_USE_PTHREAD // #else // SEM_USE_PTHREAD
// #endif // SEM_USE_PTHREAD // #endif // SEM_USE_PTHREAD
// p->valid = 0; // p->valid = 0;
// taosMemoryFree(p); // taosMemoryFree(p);
// *sem = NULL; // *sem = NULL;
// return 0; // return 0;
// } // }
typedef struct {
pthread_mutex_t count_lock; int tsem_init(tsem_t *psem, int flags, unsigned int count) {
pthread_cond_t count_bump; *psem = dispatch_semaphore_create(count);
unsigned int count; if (*psem == NULL) return -1;
} bosal_sem_t; return 0;
}
int tsem_init(tsem_t *psem, int flags, unsigned int count) {
bosal_sem_t *pnewsem; int tsem_destroy(tsem_t *psem) {
int result; return 0;
}
pnewsem = (bosal_sem_t *)malloc(sizeof(bosal_sem_t));
if (!pnewsem) { int tsem_post(tsem_t *psem) {
return -1; if (psem == NULL || *psem == NULL) return -1;
} dispatch_semaphore_signal(*psem);
result = pthread_mutex_init(&pnewsem->count_lock, NULL); return 0;
if (result) { }
free(pnewsem);
return result; int tsem_wait(tsem_t *psem) {
} if (psem == NULL || *psem == NULL) return -1;
result = pthread_cond_init(&pnewsem->count_bump, NULL); dispatch_semaphore_wait(*psem, DISPATCH_TIME_FOREVER);
if (result) { return 0;
pthread_mutex_destroy(&pnewsem->count_lock); }
free(pnewsem);
return result; int tsem_timewait(tsem_t *psem, int64_t nanosecs) {
} if (psem == NULL || *psem == NULL) return -1;
pnewsem->count = count; dispatch_semaphore_wait(*psem, nanosecs);
*psem = (tsem_t)pnewsem; return 0;
return 0; }
}
bool taosCheckPthreadValid(TdThread thread) {
int tsem_destroy(tsem_t *psem) { int32_t ret = taosThreadKill(thread, 0);
bosal_sem_t *poldsem; if (ret == ESRCH) return false;
if (ret == EINVAL) return false;
if (!psem) { // alive
return EINVAL; return true;
} }
poldsem = (bosal_sem_t *)*psem;
int64_t taosGetSelfPthreadId() {
pthread_mutex_destroy(&poldsem->count_lock); TdThread thread = taosThreadSelf();
pthread_cond_destroy(&poldsem->count_bump); return (int64_t)thread;
free(poldsem); }
return 0;
} int64_t taosGetPthreadId(TdThread thread) { return (int64_t)thread; }
int tsem_post(tsem_t *psem) { void taosResetPthread(TdThread *thread) { *thread = NULL; }
bosal_sem_t *pxsem;
int result, xresult; bool taosComparePthread(TdThread first, TdThread second) { return taosThreadEqual(first, second) ? true : false; }
if (!psem) { int32_t taosGetPId() { return (int32_t)getpid(); }
return EINVAL;
} int32_t taosGetAppName(char *name, int32_t *len) {
pxsem = (bosal_sem_t *)*psem; char buf[PATH_MAX + 1];
buf[0] = '\0';
result = pthread_mutex_lock(&pxsem->count_lock); proc_name(getpid(), buf, sizeof(buf) - 1);
if (result) { buf[PATH_MAX] = '\0';
return result; size_t n = strlen(buf);
} if (len) *len = n;
pxsem->count = pxsem->count + 1; if (name) tstrncpy(name, buf, TSDB_APP_NAME_LEN);
return 0;
xresult = pthread_cond_signal(&pxsem->count_bump); }
result = pthread_mutex_unlock(&pxsem->count_lock); #else
if (result) {
return result; /*
} * linux implementation
if (xresult) { */
errno = xresult;
return -1; #include <sys/syscall.h>
} #include <unistd.h>
return 0;
} bool taosCheckPthreadValid(TdThread thread) { return thread != 0; }
int tsem_trywait(tsem_t *psem) { int64_t taosGetSelfPthreadId() {
bosal_sem_t *pxsem; static __thread int id = 0;
int result, xresult; if (id != 0) return id;
id = syscall(SYS_gettid);
if (!psem) { return id;
return EINVAL; }
}
pxsem = (bosal_sem_t *)*psem; int64_t taosGetPthreadId(TdThread thread) { return (int64_t)thread; }
void taosResetPthread(TdThread* thread) { *thread = 0; }
result = pthread_mutex_lock(&pxsem->count_lock); bool taosComparePthread(TdThread first, TdThread second) { return first == second; }
if (result) {
return result; int32_t taosGetPId() {
} static int32_t pid;
xresult = 0; if (pid != 0) return pid;
pid = getpid();
if (pxsem->count > 0) { return pid;
pxsem->count--; }
} else {
xresult = EAGAIN; int32_t taosGetAppName(char* name, int32_t* len) {
} const char* self = "/proc/self/exe";
result = pthread_mutex_unlock(&pxsem->count_lock); char path[PATH_MAX] = {0};
if (result) {
return result; if (readlink(self, path, PATH_MAX) <= 0) {
} return -1;
if (xresult) { }
errno = xresult;
return -1; path[PATH_MAX - 1] = 0;
} char* end = strrchr(path, '/');
return 0; if (end == NULL) {
} return -1;
}
int tsem_wait(tsem_t *psem) {
bosal_sem_t *pxsem; ++end;
int result, xresult;
tstrncpy(name, end, TSDB_APP_NAME_LEN);
if (!psem) {
return EINVAL; if (len != NULL) {
} *len = strlen(name);
pxsem = (bosal_sem_t *)*psem; }
result = pthread_mutex_lock(&pxsem->count_lock); return 0;
if (result) { }
return result;
} int32_t tsem_wait(tsem_t* sem) {
xresult = 0; int ret = 0;
do {
if (pxsem->count == 0) { ret = sem_wait(sem);
xresult = pthread_cond_wait(&pxsem->count_bump, &pxsem->count_lock); } while (ret != 0 && errno == EINTR);
} return ret;
if (!xresult) { }
if (pxsem->count > 0) {
pxsem->count--; int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
} int ret = 0;
}
result = pthread_mutex_unlock(&pxsem->count_lock); struct timespec tv = {
if (result) { .tv_sec = 0,
return result; .tv_nsec = nanosecs,
} };
if (xresult) {
errno = xresult; while ((ret = sem_timedwait(sem, &tv)) == -1 && errno == EINTR) continue;
return -1;
} return ret;
return 0; }
}
#endif
int tsem_timewait(tsem_t *psem, int64_t nanosecs) {
struct timespec abstim = {
.tv_sec = 0,
.tv_nsec = nanosecs,
};
bosal_sem_t *pxsem;
int result, xresult;
if (!psem) {
return EINVAL;
}
pxsem = (bosal_sem_t *)*psem;
result = pthread_mutex_lock(&pxsem->count_lock);
if (result) {
return result;
}
xresult = 0;
if (pxsem->count == 0) {
xresult = pthread_cond_timedwait(&pxsem->count_bump, &pxsem->count_lock, &abstim);
}
if (!xresult) {
if (pxsem->count > 0) {
pxsem->count--;
}
}
result = pthread_mutex_unlock(&pxsem->count_lock);
if (result) {
return result;
}
if (xresult) {
errno = xresult;
return -1;
}
return 0;
}
bool taosCheckPthreadValid(TdThread thread) {
int32_t ret = taosThreadKill(thread, 0);
if (ret == ESRCH) return false;
if (ret == EINVAL) return false;
// alive
return true;
}
int64_t taosGetSelfPthreadId() {
TdThread thread = taosThreadSelf();
return (int64_t)thread;
}
int64_t taosGetPthreadId(TdThread thread) { return (int64_t)thread; }
void taosResetPthread(TdThread *thread) { *thread = NULL; }
bool taosComparePthread(TdThread first, TdThread second) { return taosThreadEqual(first, second) ? true : false; }
int32_t taosGetPId() { return (int32_t)getpid(); }
int32_t taosGetAppName(char *name, int32_t *len) {
char buf[PATH_MAX + 1];
buf[0] = '\0';
proc_name(getpid(), buf, sizeof(buf) - 1);
buf[PATH_MAX] = '\0';
size_t n = strlen(buf);
if (len) *len = n;
if (name) tstrncpy(name, buf, TSDB_APP_NAME_LEN);
return 0;
}
#else
/*
* linux implementation
*/
#include <sys/syscall.h>
#include <unistd.h>
bool taosCheckPthreadValid(TdThread thread) { return thread != 0; }
int64_t taosGetSelfPthreadId() {
static __thread int id = 0;
if (id != 0) return id;
id = syscall(SYS_gettid);
return id;
}
int64_t taosGetPthreadId(TdThread thread) { return (int64_t)thread; }
void taosResetPthread(TdThread* thread) { *thread = 0; }
bool taosComparePthread(TdThread first, TdThread second) { return first == second; }
int32_t taosGetPId() {
static int32_t pid;
if (pid != 0) return pid;
pid = getpid();
return pid;
}
int32_t taosGetAppName(char* name, int32_t* len) {
const char* self = "/proc/self/exe";
char path[PATH_MAX] = {0};
if (readlink(self, path, PATH_MAX) <= 0) {
return -1;
}
path[PATH_MAX - 1] = 0;
char* end = strrchr(path, '/');
if (end == NULL) {
return -1;
}
++end;
tstrncpy(name, end, TSDB_APP_NAME_LEN);
if (len != NULL) {
*len = strlen(name);
}
return 0;
}
int32_t tsem_wait(tsem_t* sem) {
int ret = 0;
do {
ret = sem_wait(sem);
} while (ret != 0 && errno == EINTR);
return ret;
}
int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
int ret = 0;
struct timespec tv = {
.tv_sec = 0,
.tv_nsec = nanosecs,
};
while ((ret = sem_timedwait(sem, &tv)) == -1 && errno == EINTR) continue;
return ret;
}
#endif
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册