Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
b9a1c674
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
b9a1c674
编写于
7月 02, 2022
作者:
wafwerar
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
os: add Mac compile support
上级
8a76abc3
变更
35
隐藏空白更改
内联
并排
Showing
35 changed file
with
732 addition
and
525 deletion
+732
-525
cmake/cmake.define
cmake/cmake.define
+4
-4
include/libs/qcom/query.h
include/libs/qcom/query.h
+1
-1
include/os/os.h
include/os/os.h
+0
-1
include/os/osAtomic.h
include/os/osAtomic.h
+2
-2
include/os/osSemaphore.h
include/os/osSemaphore.h
+8
-6
include/os/osSocket.h
include/os/osSocket.h
+0
-17
include/os/osThread.h
include/os/osThread.h
+6
-6
source/common/src/ttszip.c
source/common/src/ttszip.c
+0
-4
source/dnode/mnode/impl/inc/mndInt.h
source/dnode/mnode/impl/inc/mndInt.h
+1
-1
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+1
-1
source/libs/catalog/inc/catalogInt.h
source/libs/catalog/inc/catalogInt.h
+1
-1
source/libs/catalog/test/CMakeLists.txt
source/libs/catalog/test/CMakeLists.txt
+19
-17
source/libs/executor/src/dataSinkMgt.c
source/libs/executor/src/dataSinkMgt.c
+1
-1
source/libs/index/src/indexFstSparse.c
source/libs/index/src/indexFstSparse.c
+1
-1
source/libs/monitor/test/monTest.cpp
source/libs/monitor/test/monTest.cpp
+12
-12
source/libs/parser/test/CMakeLists.txt
source/libs/parser/test/CMakeLists.txt
+26
-24
source/libs/qworker/test/CMakeLists.txt
source/libs/qworker/test/CMakeLists.txt
+15
-14
source/libs/scalar/src/scalar.c
source/libs/scalar/src/scalar.c
+1
-1
source/libs/scalar/test/filter/CMakeLists.txt
source/libs/scalar/test/filter/CMakeLists.txt
+15
-13
source/libs/scalar/test/scalar/CMakeLists.txt
source/libs/scalar/test/scalar/CMakeLists.txt
+20
-18
source/libs/scheduler/inc/schedulerInt.h
source/libs/scheduler/inc/schedulerInt.h
+3
-3
source/libs/scheduler/test/CMakeLists.txt
source/libs/scheduler/test/CMakeLists.txt
+15
-13
source/libs/tdb/src/db/tdbTxn.c
source/libs/tdb/src/db/tdbTxn.c
+1
-1
source/os/CMakeLists.txt
source/os/CMakeLists.txt
+8
-4
source/os/src/osAtomic.c
source/os/src/osAtomic.c
+18
-2
source/os/src/osFile.c
source/os/src/osFile.c
+27
-19
source/os/src/osMemory.c
source/os/src/osMemory.c
+6
-0
source/os/src/osSemaphore.c
source/os/src/osSemaphore.c
+474
-260
source/os/src/osSignal.c
source/os/src/osSignal.c
+5
-1
source/os/src/osSocket.c
source/os/src/osSocket.c
+13
-54
source/os/src/osThread.c
source/os/src/osThread.c
+19
-19
source/util/src/tcache.c
source/util/src/tcache.c
+3
-3
source/util/src/ttimer.c
source/util/src/ttimer.c
+1
-1
tools/shell/src/shellArguments.c
tools/shell/src/shellArguments.c
+4
-0
tools/shell/src/shellEngine.c
tools/shell/src/shellEngine.c
+1
-0
未找到文件。
cmake/cmake.define
浏览文件 @
b9a1c674
...
...
@@ -98,12 +98,12 @@ ELSE ()
ENDIF ()
IF (${SANITIZER} MATCHES "true")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=shift-base -fno-sanitize=alignment -g3")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=shift-base -fno-sanitize=alignment -g3")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=shift-base -fno-sanitize=alignment -g3
-Wformat=0
")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -fsanitize=address -fsanitize=undefined -fsanitize-recover=all -fsanitize=float-divide-by-zero -fsanitize=float-cast-overflow -fno-sanitize=shift-base -fno-sanitize=alignment -g3
-Wformat=0
")
MESSAGE(STATUS "Will compile with Address Sanitizer!")
ELSE ()
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -g3")
SET(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Werror -Werror=return-type -fPIC -gdwarf-2 -g3
-Wformat=0
")
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror -Wno-literal-suffix -Werror=return-type -fPIC -gdwarf-2 -g3
-Wformat=0
")
ENDIF ()
MESSAGE("System processor ID: ${CMAKE_SYSTEM_PROCESSOR}")
...
...
include/libs/qcom/query.h
浏览文件 @
b9a1c674
...
...
@@ -65,7 +65,7 @@ typedef struct SQueryExecRes {
}
SQueryExecRes
;
typedef
struct
SIndexMeta
{
#if
def WINDOWS
#if
defined(WINDOWS) || defined(_TD_DARWIN_64)
size_t
avoidCompilationErrors
;
#endif
...
...
include/os/os.h
浏览文件 @
b9a1c674
...
...
@@ -41,7 +41,6 @@ extern "C" {
#include <sys/types.h>
#include <termios.h>
#include <sys/statvfs.h>
#include <sys/prctl.h>
#include <sys/shm.h>
#include <sys/wait.h>
...
...
include/os/osAtomic.h
浏览文件 @
b9a1c674
...
...
@@ -63,7 +63,7 @@ int8_t atomic_add_fetch_8(int8_t volatile *ptr, int8_t val);
int16_t
atomic_add_fetch_16
(
int16_t
volatile
*
ptr
,
int16_t
val
);
int32_t
atomic_add_fetch_32
(
int32_t
volatile
*
ptr
,
int32_t
val
);
int64_t
atomic_add_fetch_64
(
int64_t
volatile
*
ptr
,
int64_t
val
);
void
*
atomic_add_fetch_ptr
(
void
*
ptr
,
void
*
val
);
void
*
atomic_add_fetch_ptr
(
void
*
ptr
,
int64_t
val
);
int8_t
atomic_fetch_add_8
(
int8_t
volatile
*
ptr
,
int8_t
val
);
int16_t
atomic_fetch_add_16
(
int16_t
volatile
*
ptr
,
int16_t
val
);
int32_t
atomic_fetch_add_32
(
int32_t
volatile
*
ptr
,
int32_t
val
);
...
...
@@ -73,7 +73,7 @@ int8_t atomic_sub_fetch_8(int8_t volatile *ptr, int8_t val);
int16_t
atomic_sub_fetch_16
(
int16_t
volatile
*
ptr
,
int16_t
val
);
int32_t
atomic_sub_fetch_32
(
int32_t
volatile
*
ptr
,
int32_t
val
);
int64_t
atomic_sub_fetch_64
(
int64_t
volatile
*
ptr
,
int64_t
val
);
void
*
atomic_sub_fetch_ptr
(
void
*
ptr
,
void
*
val
);
void
*
atomic_sub_fetch_ptr
(
void
*
ptr
,
int64_t
val
);
int8_t
atomic_fetch_sub_8
(
int8_t
volatile
*
ptr
,
int8_t
val
);
int16_t
atomic_fetch_sub_16
(
int16_t
volatile
*
ptr
,
int16_t
val
);
int32_t
atomic_fetch_sub_32
(
int32_t
volatile
*
ptr
,
int32_t
val
);
...
...
include/os/osSemaphore.h
浏览文件 @
b9a1c674
...
...
@@ -24,7 +24,9 @@ extern "C" {
#if defined(_TD_DARWIN_64)
typedef
struct
tsem_s
*
tsem_t
;
// typedef struct tsem_s *tsem_t;
typedef
struct
bosal_sem_t
*
tsem_t
;
int
tsem_init
(
tsem_t
*
sem
,
int
pshared
,
unsigned
int
value
);
int
tsem_wait
(
tsem_t
*
sem
);
...
...
@@ -51,11 +53,11 @@ int tsem_timewait(tsem_t *sim, int64_t nanosecs);
// #define taosThreadRwlockRdlock(lock) taosThreadMutexLock(lock)
// #define taosThreadRwlockUnlock(lock) taosThreadMutexUnlock(lock)
#define TdThreadSpinlock TdThreadMutex
#define taosThreadSpinInit(lock, NULL) taosThreadMutexInit(lock, NULL)
#define taosThreadSpinDestroy(lock) taosThreadMutexDestroy(lock)
#define taosThreadSpinLock(lock) taosThreadMutexLock(lock)
#define taosThreadSpinUnlock(lock) taosThreadMutexUnlock(lock)
//
#define TdThreadSpinlock TdThreadMutex
//
#define taosThreadSpinInit(lock, NULL) taosThreadMutexInit(lock, NULL)
//
#define taosThreadSpinDestroy(lock) taosThreadMutexDestroy(lock)
//
#define taosThreadSpinLock(lock) taosThreadMutexLock(lock)
//
#define taosThreadSpinUnlock(lock) taosThreadMutexUnlock(lock)
#endif
bool
taosCheckPthreadValid
(
TdThread
thread
);
...
...
include/os/osSocket.h
浏览文件 @
b9a1c674
...
...
@@ -64,7 +64,6 @@
#include <osEok.h>
#else
#include <netinet/in.h>
#include <sys/epoll.h>
#endif
#endif
...
...
@@ -77,15 +76,12 @@ typedef int socklen_t;
#define TAOS_EPOLL_WAIT_TIME 100
typedef
SOCKET
eventfd_t
;
#define eventfd(a, b) -1
#define EpollClose(pollFd) epoll_close(pollFd)
#ifndef EPOLLWAKEUP
#define EPOLLWAKEUP (1u << 29)
#endif
#elif defined(_TD_DARWIN_64)
#define TAOS_EPOLL_WAIT_TIME 500
typedef
int32_t
SOCKET
;
typedef
SOCKET
EpollFd
;
#define EpollClose(pollFd) epoll_close(pollFd)
#else
#define TAOS_EPOLL_WAIT_TIME 500
typedef
int32_t
SOCKET
;
...
...
@@ -122,14 +118,6 @@ typedef SOCKET EpollFd;
typedef
int32_t
SocketFd
;
typedef
SocketFd
EpollFd
;
typedef
struct
TdSocket
{
#if SOCKET_WITH_LOCK
TdThreadRwlock
rwlock
;
#endif
int
refId
;
SocketFd
fd
;
}
*
TdSocketPtr
,
TdSocket
;
typedef
struct
TdSocketServer
*
TdSocketServerPtr
;
typedef
struct
TdSocket
*
TdSocketPtr
;
typedef
struct
TdEpoll
*
TdEpollPtr
;
...
...
@@ -181,11 +169,6 @@ void taosSetMaskSIGPIPE();
uint32_t
taosInetAddr
(
const
char
*
ipAddr
);
const
char
*
taosInetNtoa
(
struct
in_addr
ipInt
);
TdEpollPtr
taosCreateEpoll
(
int32_t
size
);
int32_t
taosCtlEpoll
(
TdEpollPtr
pEpoll
,
int32_t
epollOperate
,
TdSocketPtr
pSocket
,
struct
epoll_event
*
event
);
int32_t
taosWaitEpoll
(
TdEpollPtr
pEpoll
,
struct
epoll_event
*
event
,
int32_t
maxEvents
,
int32_t
timeout
);
int32_t
taosCloseEpoll
(
TdEpollPtr
*
ppEpoll
);
#ifdef __cplusplus
}
#endif
...
...
include/os/osThread.h
浏览文件 @
b9a1c674
...
...
@@ -188,27 +188,27 @@ int32_t taosThreadJoin(TdThread thread, void **valuePtr);
int32_t
taosThreadKeyCreate
(
TdThreadKey
*
key
,
void
(
*
destructor
)(
void
*
));
int32_t
taosThreadKeyDelete
(
TdThreadKey
key
);
int32_t
taosThreadKill
(
TdThread
thread
,
int32_t
sig
);
int32_t
taosThreadMutexConsistent
(
TdThreadMutex
*
mutex
);
//
int32_t taosThreadMutexConsistent(TdThreadMutex* mutex);
int32_t
taosThreadMutexDestroy
(
TdThreadMutex
*
mutex
);
int32_t
taosThreadMutexInit
(
TdThreadMutex
*
mutex
,
const
TdThreadMutexAttr
*
attr
);
int32_t
taosThreadMutexLock
(
TdThreadMutex
*
mutex
);
int32_t
taosThreadMutexTimedLock
(
TdThreadMutex
*
mutex
,
const
struct
timespec
*
abstime
);
//
int32_t taosThreadMutexTimedLock(TdThreadMutex * mutex, const struct timespec *abstime);
int32_t
taosThreadMutexTryLock
(
TdThreadMutex
*
mutex
);
int32_t
taosThreadMutexUnlock
(
TdThreadMutex
*
mutex
);
int32_t
taosThreadMutexAttrDestroy
(
TdThreadMutexAttr
*
attr
);
int32_t
taosThreadMutexAttrGetPshared
(
const
TdThreadMutexAttr
*
attr
,
int32_t
*
pshared
);
int32_t
taosThreadMutexAttrGetRobust
(
const
TdThreadMutexAttr
*
attr
,
int32_t
*
robust
);
//
int32_t taosThreadMutexAttrGetRobust(const TdThreadMutexAttr * attr, int32_t * robust);
int32_t
taosThreadMutexAttrGetType
(
const
TdThreadMutexAttr
*
attr
,
int32_t
*
kind
);
int32_t
taosThreadMutexAttrInit
(
TdThreadMutexAttr
*
attr
);
int32_t
taosThreadMutexAttrSetPshared
(
TdThreadMutexAttr
*
attr
,
int32_t
pshared
);
int32_t
taosThreadMutexAttrSetRobust
(
TdThreadMutexAttr
*
attr
,
int32_t
robust
);
//
int32_t taosThreadMutexAttrSetRobust(TdThreadMutexAttr * attr, int32_t robust);
int32_t
taosThreadMutexAttrSetType
(
TdThreadMutexAttr
*
attr
,
int32_t
kind
);
int32_t
taosThreadOnce
(
TdThreadOnce
*
onceControl
,
void
(
*
initRoutine
)(
void
));
int32_t
taosThreadRwlockDestroy
(
TdThreadRwlock
*
rwlock
);
int32_t
taosThreadRwlockInit
(
TdThreadRwlock
*
rwlock
,
const
TdThreadRwlockAttr
*
attr
);
int32_t
taosThreadRwlockRdlock
(
TdThreadRwlock
*
rwlock
);
int32_t
taosThreadRwlockTimedRdlock
(
TdThreadRwlock
*
rwlock
,
const
struct
timespec
*
abstime
);
int32_t
taosThreadRwlockTimedWrlock
(
TdThreadRwlock
*
rwlock
,
const
struct
timespec
*
abstime
);
//
int32_t taosThreadRwlockTimedRdlock(TdThreadRwlock * rwlock, const struct timespec *abstime);
//
int32_t taosThreadRwlockTimedWrlock(TdThreadRwlock * rwlock, const struct timespec *abstime);
int32_t
taosThreadRwlockTryRdlock
(
TdThreadRwlock
*
rwlock
);
int32_t
taosThreadRwlockTryWrlock
(
TdThreadRwlock
*
rwlock
);
int32_t
taosThreadRwlockUnlock
(
TdThreadRwlock
*
rwlock
);
...
...
source/common/src/ttszip.c
浏览文件 @
b9a1c674
...
...
@@ -845,11 +845,7 @@ int32_t tsBufMerge(STSBuf* pDestBuf, const STSBuf* pSrcBuf) {
int64_t
offset
=
getDataStartOffset
();
int32_t
size
=
(
int32_t
)
pSrcBuf
->
fileSize
-
(
int32_t
)
offset
;
#if defined(_TD_DARWIN_64)
int64_t
written
=
taosFSendFile
(
pDestBuf
->
pFile
->
fp
,
pSrcBuf
->
pFile
->
fp
,
&
offset
,
size
);
#else
int64_t
written
=
taosFSendFile
(
pDestBuf
->
pFile
,
pSrcBuf
->
pFile
,
&
offset
,
size
);
#endif
if
(
written
==
-
1
||
written
!=
size
)
{
return
-
1
;
...
...
source/dnode/mnode/impl/inc/mndInt.h
浏览文件 @
b9a1c674
...
...
@@ -87,7 +87,7 @@ typedef struct {
}
STelemMgmt
;
typedef
struct
{
sem_t
syncSem
;
t
sem_t
syncSem
;
int64_t
sync
;
bool
standby
;
SReplica
replica
;
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
b9a1c674
...
...
@@ -241,7 +241,7 @@ struct SVnode {
tsem_t
canCommit
;
int64_t
sync
;
int32_t
syncCount
;
sem_t
syncSem
;
tsem_t
syncSem
;
SQHandle
*
pQuery
;
};
...
...
source/libs/catalog/inc/catalogInt.h
浏览文件 @
b9a1c674
...
...
@@ -278,7 +278,7 @@ typedef struct SCtgAsyncFps {
typedef
struct
SCtgApiStat
{
#if
def WINDOWS
#if
defined(WINDOWS) || defined(_TD_DARWIN_64)
size_t
avoidCompilationErrors
;
#endif
...
...
source/libs/catalog/test/CMakeLists.txt
浏览文件 @
b9a1c674
MESSAGE
(
STATUS
"build catalog unit test"
)
# GoogleTest requires at least C++11
SET
(
CMAKE_CXX_STANDARD 11
)
AUX_SOURCE_DIRECTORY
(
${
CMAKE_CURRENT_SOURCE_DIR
}
SOURCE_LIST
)
IF
(
NOT TD_DARWIN
)
# GoogleTest requires at least C++11
SET
(
CMAKE_CXX_STANDARD 11
)
AUX_SOURCE_DIRECTORY
(
${
CMAKE_CURRENT_SOURCE_DIR
}
SOURCE_LIST
)
ADD_EXECUTABLE
(
catalogTest
${
SOURCE_LIST
}
)
TARGET_LINK_LIBRARIES
(
catalogTest
PUBLIC os util common catalog transport gtest qcom taos_static
)
ADD_EXECUTABLE
(
catalogTest
${
SOURCE_LIST
}
)
TARGET_LINK_LIBRARIES
(
catalogTest
PUBLIC os util common catalog transport gtest qcom taos_static
)
TARGET_INCLUDE_DIRECTORIES
(
catalogTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/catalog/"
PRIVATE
"
${
TD_SOURCE_DIR
}
/source/libs/catalog/inc"
)
TARGET_INCLUDE_DIRECTORIES
(
catalogTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/catalog/"
PRIVATE
"
${
TD_SOURCE_DIR
}
/source/libs/catalog/inc"
)
# add_test(
# NAME catalogTest
# COMMAND catalogTest
# )
# add_test(
# NAME catalogTest
# COMMAND catalogTest
# )
ENDIF
()
source/libs/executor/src/dataSinkMgt.c
浏览文件 @
b9a1c674
...
...
@@ -35,7 +35,7 @@ int32_t dsDataSinkGetCacheSize(SDataSinkStat *pStat) {
int32_t
dsCreateDataSinker
(
const
SDataSinkNode
*
pDataSink
,
DataSinkHandle
*
pHandle
,
void
*
pParam
)
{
switch
(
nodeType
(
pDataSink
))
{
switch
(
(
int
)
nodeType
(
pDataSink
))
{
case
QUERY_NODE_PHYSICAL_PLAN_DISPATCH
:
return
createDataDispatcher
(
&
gDataSinkManager
,
pDataSink
,
pHandle
);
case
QUERY_NODE_PHYSICAL_PLAN_DELETE
:
...
...
source/libs/index/src/indexFstSparse.c
浏览文件 @
b9a1c674
...
...
@@ -17,7 +17,7 @@
FstSparseSet
*
sparSetCreate
(
int32_t
sz
)
{
FstSparseSet
*
ss
=
taosMemoryCalloc
(
1
,
sizeof
(
FstSparseSet
));
if
(
ss
=
NULL
)
{
if
(
ss
=
=
NULL
)
{
return
NULL
;
}
...
...
source/libs/monitor/test/monTest.cpp
浏览文件 @
b9a1c674
...
...
@@ -75,18 +75,18 @@ void MonitorTest::GetSysInfo(SMonSysInfo *pInfo) {
pInfo
->
cpu_engine
=
2.1
;
pInfo
->
cpu_system
=
2.1
;
pInfo
->
cpu_cores
=
2
;
pInfo
->
mem_engine
=
3
.1
;
pInfo
->
mem_system
=
3
.2
;
pInfo
->
mem_total
=
3
.3
;
pInfo
->
disk_engine
=
4
.1
;
pInfo
->
disk_used
=
4
.2
;
pInfo
->
disk_total
=
4
.3
;
pInfo
->
net_in
=
5
.1
;
pInfo
->
net_out
=
5
.2
;
pInfo
->
io_read
=
6
.1
;
pInfo
->
io_write
=
6
.2
;
pInfo
->
io_read_disk
=
7
.1
;
pInfo
->
io_write_disk
=
7
.2
;
pInfo
->
mem_engine
=
3
;
pInfo
->
mem_system
=
3
;
pInfo
->
mem_total
=
3
;
pInfo
->
disk_engine
=
4
;
pInfo
->
disk_used
=
4
;
pInfo
->
disk_total
=
4
;
pInfo
->
net_in
=
5
;
pInfo
->
net_out
=
5
;
pInfo
->
io_read
=
6
;
pInfo
->
io_write
=
6
;
pInfo
->
io_read_disk
=
7
;
pInfo
->
io_write_disk
=
7
;
}
void
MonitorTest
::
GetClusterInfo
(
SMonClusterInfo
*
pInfo
)
{
...
...
source/libs/parser/test/CMakeLists.txt
浏览文件 @
b9a1c674
MESSAGE
(
STATUS
"build parser unit test"
)
# GoogleTest requires at least C++11
SET
(
CMAKE_CXX_STANDARD 11
)
AUX_SOURCE_DIRECTORY
(
${
CMAKE_CURRENT_SOURCE_DIR
}
SOURCE_LIST
)
IF
(
NOT TD_DARWIN
)
# GoogleTest requires at least C++11
SET
(
CMAKE_CXX_STANDARD 11
)
AUX_SOURCE_DIRECTORY
(
${
CMAKE_CURRENT_SOURCE_DIR
}
SOURCE_LIST
)
ADD_EXECUTABLE
(
parserTest
${
SOURCE_LIST
}
)
ADD_EXECUTABLE
(
parserTest
${
SOURCE_LIST
}
)
TARGET_INCLUDE_DIRECTORIES
(
parserTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/parser/"
PRIVATE
"
${
TD_SOURCE_DIR
}
/source/libs/parser/inc"
)
TARGET_LINK_LIBRARIES
(
parserTest
PUBLIC os util common nodes parser catalog transport gtest function planner qcom
)
TARGET_INCLUDE_DIRECTORIES
(
parserTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/parser/"
PRIVATE
"
${
TD_SOURCE_DIR
}
/source/libs/parser/inc"
)
if
(
${
BUILD_WINGETOPT
}
)
target_include_directories
(
parserTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/contrib/wingetopt/src"
TARGET_LINK_LIBRARIES
(
parserTest
PUBLIC os util common nodes parser catalog transport gtest function planner qcom
)
target_link_libraries
(
parserTest PUBLIC wingetopt
)
endif
()
add_test
(
NAME parserTest
COMMAND parserTest
)
if
(
${
BUILD_WINGETOPT
}
)
target_include_directories
(
parserTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/contrib/wingetopt/src"
)
target_link_libraries
(
parserTest PUBLIC wingetopt
)
endif
()
add_test
(
NAME parserTest
COMMAND parserTest
)
ENDIF
()
\ No newline at end of file
source/libs/qworker/test/CMakeLists.txt
浏览文件 @
b9a1c674
MESSAGE
(
STATUS
"build qworker unit test"
)
IF
(
NOT TD_DARWIN
)
# GoogleTest requires at least C++11
SET
(
CMAKE_CXX_STANDARD 11
)
AUX_SOURCE_DIRECTORY
(
${
CMAKE_CURRENT_SOURCE_DIR
}
SOURCE_LIST
)
# GoogleTest requires at least C++11
SET
(
CMAKE_CXX_STANDARD 11
)
AUX_SOURCE_DIRECTORY
(
${
CMAKE_CURRENT_SOURCE_DIR
}
SOURCE_LIST
)
ADD_EXECUTABLE
(
qworkerTest
${
SOURCE_LIST
}
)
TARGET_LINK_LIBRARIES
(
qworkerTest
PUBLIC os util common transport gtest qcom nodes planner qworker executor
)
ADD_EXECUTABLE
(
qworkerTest
${
SOURCE_LIST
}
)
TARGET_LINK_LIBRARIES
(
qworkerTest
PUBLIC os util common transport gtest qcom nodes planner qworker executor
)
TARGET_INCLUDE_DIRECTORIES
(
qworkerTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/qworker/"
PRIVATE
"
${
TD_SOURCE_DIR
}
/source/libs/qworker/inc"
)
TARGET_INCLUDE_DIRECTORIES
(
qworkerTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/qworker/"
PRIVATE
"
${
TD_SOURCE_DIR
}
/source/libs/qworker/inc"
)
ENDIF
()
source/libs/scalar/src/scalar.c
浏览文件 @
b9a1c674
...
...
@@ -345,7 +345,7 @@ int32_t sclGetNodeType(SNode *pNode, SScalarCtx *ctx) {
return
-
1
;
}
switch
(
nodeType
(
pNode
))
{
switch
(
(
int
)
nodeType
(
pNode
))
{
case
QUERY_NODE_VALUE
:
{
SValueNode
*
valueNode
=
(
SValueNode
*
)
pNode
;
return
valueNode
->
node
.
resType
.
type
;
...
...
source/libs/scalar/test/filter/CMakeLists.txt
浏览文件 @
b9a1c674
MESSAGE
(
STATUS
"build filter unit test"
)
# GoogleTest requires at least C++11
SET
(
CMAKE_CXX_STANDARD 11
)
AUX_SOURCE_DIRECTORY
(
${
CMAKE_CURRENT_SOURCE_DIR
}
SOURCE_LIST
)
IF
(
NOT TD_DARWIN
)
# GoogleTest requires at least C++11
SET
(
CMAKE_CXX_STANDARD 11
)
AUX_SOURCE_DIRECTORY
(
${
CMAKE_CURRENT_SOURCE_DIR
}
SOURCE_LIST
)
ADD_EXECUTABLE
(
filterTest
${
SOURCE_LIST
}
)
TARGET_LINK_LIBRARIES
(
filterTest
PUBLIC os util common gtest qcom function nodes scalar
)
ADD_EXECUTABLE
(
filterTest
${
SOURCE_LIST
}
)
TARGET_LINK_LIBRARIES
(
filterTest
PUBLIC os util common gtest qcom function nodes scalar
)
TARGET_INCLUDE_DIRECTORIES
(
filterTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/scalar/"
PRIVATE
"
${
TD_SOURCE_DIR
}
/source/libs/scalar/inc"
)
TARGET_INCLUDE_DIRECTORIES
(
filterTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/scalar/"
PRIVATE
"
${
TD_SOURCE_DIR
}
/source/libs/scalar/inc"
)
ENDIF
()
\ No newline at end of file
source/libs/scalar/test/scalar/CMakeLists.txt
浏览文件 @
b9a1c674
MESSAGE
(
STATUS
"build scalar unit test"
)
# GoogleTest requires at least C++11
SET
(
CMAKE_CXX_STANDARD 11
)
AUX_SOURCE_DIRECTORY
(
${
CMAKE_CURRENT_SOURCE_DIR
}
SOURCE_LIST
)
IF
(
NOT TD_DARWIN
)
# GoogleTest requires at least C++11
SET
(
CMAKE_CXX_STANDARD 11
)
AUX_SOURCE_DIRECTORY
(
${
CMAKE_CURRENT_SOURCE_DIR
}
SOURCE_LIST
)
ADD_EXECUTABLE
(
scalarTest
${
SOURCE_LIST
}
)
TARGET_LINK_LIBRARIES
(
scalarTest
PUBLIC os util common gtest qcom function nodes scalar parser catalog transport
)
ADD_EXECUTABLE
(
scalarTest
${
SOURCE_LIST
}
)
TARGET_LINK_LIBRARIES
(
scalarTest
PUBLIC os util common gtest qcom function nodes scalar parser catalog transport
)
TARGET_INCLUDE_DIRECTORIES
(
scalarTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/scalar/"
PUBLIC
"
${
TD_SOURCE_DIR
}
/source/libs/parser/inc"
PRIVATE
"
${
TD_SOURCE_DIR
}
/source/libs/scalar/inc"
)
add_test
(
NAME scalarTest
COMMAND scalarTest
)
TARGET_INCLUDE_DIRECTORIES
(
scalarTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/scalar/"
PUBLIC
"
${
TD_SOURCE_DIR
}
/source/libs/parser/inc"
PRIVATE
"
${
TD_SOURCE_DIR
}
/source/libs/scalar/inc"
)
add_test
(
NAME scalarTest
COMMAND scalarTest
)
ENDIF
()
source/libs/scheduler/inc/schedulerInt.h
浏览文件 @
b9a1c674
...
...
@@ -68,7 +68,7 @@ typedef struct SSchHbTrans {
typedef
struct
SSchApiStat
{
#if
def WINDOWS
#if
defined(WINDOWS) || defined(_TD_DARWIN_64)
size_t
avoidCompilationErrors
;
#endif
...
...
@@ -76,7 +76,7 @@ typedef struct SSchApiStat {
typedef
struct
SSchRuntimeStat
{
#if
def WINDOWS
#if
defined(WINDOWS) || defined(_TD_DARWIN_64)
size_t
avoidCompilationErrors
;
#endif
...
...
@@ -84,7 +84,7 @@ typedef struct SSchRuntimeStat {
typedef
struct
SSchJobStat
{
#if
def WINDOWS
#if
defined(WINDOWS) || defined(_TD_DARWIN_64)
size_t
avoidCompilationErrors
;
#endif
...
...
source/libs/scheduler/test/CMakeLists.txt
浏览文件 @
b9a1c674
MESSAGE
(
STATUS
"build scheduler unit test"
)
# GoogleTest requires at least C++11
SET
(
CMAKE_CXX_STANDARD 11
)
AUX_SOURCE_DIRECTORY
(
${
CMAKE_CURRENT_SOURCE_DIR
}
SOURCE_LIST
)
IF
(
NOT TD_DARWIN
)
# GoogleTest requires at least C++11
SET
(
CMAKE_CXX_STANDARD 11
)
AUX_SOURCE_DIRECTORY
(
${
CMAKE_CURRENT_SOURCE_DIR
}
SOURCE_LIST
)
ADD_EXECUTABLE
(
schedulerTest
${
SOURCE_LIST
}
)
TARGET_LINK_LIBRARIES
(
schedulerTest
PUBLIC os util common catalog transport gtest qcom taos_static planner scheduler
)
ADD_EXECUTABLE
(
schedulerTest
${
SOURCE_LIST
}
)
TARGET_LINK_LIBRARIES
(
schedulerTest
PUBLIC os util common catalog transport gtest qcom taos_static planner scheduler
)
TARGET_INCLUDE_DIRECTORIES
(
schedulerTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/scheduler/"
PRIVATE
"
${
TD_SOURCE_DIR
}
/source/libs/scheduler/inc"
)
TARGET_INCLUDE_DIRECTORIES
(
schedulerTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/scheduler/"
PRIVATE
"
${
TD_SOURCE_DIR
}
/source/libs/scheduler/inc"
)
ENDIF
()
\ No newline at end of file
source/libs/tdb/src/db/tdbTxn.c
浏览文件 @
b9a1c674
...
...
@@ -18,7 +18,7 @@
int
tdbTxnOpen
(
TXN
*
pTxn
,
int64_t
txnid
,
void
*
(
*
xMalloc
)(
void
*
,
size_t
),
void
(
*
xFree
)(
void
*
,
void
*
),
void
*
xArg
,
int
flags
)
{
// not support read-committed version at the moment
ASSERT
(
flags
==
0
||
flags
==
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
);
ASSERT
(
flags
==
0
||
flags
==
(
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
)
);
pTxn
->
flags
=
flags
;
pTxn
->
txnId
=
txnid
;
...
...
source/os/CMakeLists.txt
浏览文件 @
b9a1c674
...
...
@@ -39,12 +39,16 @@ endif()
target_link_libraries
(
os PUBLIC pthread
)
if
(
NOT
TD_WINDOWS
)
if
(
TD_WINDOWS
)
target_link_libraries
(
os PUBLIC dl m rt
os PUBLIC ws2_32 iconv msvcregex wcwidth winmm
)
elseif
(
TD_DARWIN_64
)
target_link_libraries
(
os PUBLIC dl m iconv
)
else
()
target_link_libraries
(
os PUBLIC
ws2_32 iconv msvcregex wcwidth winmm
os PUBLIC
dl m rt
)
endif
(
NOT TD_WINDOWS
)
endif
()
source/os/src/osAtomic.c
浏览文件 @
b9a1c674
...
...
@@ -518,7 +518,7 @@ int64_t atomic_add_fetch_64(int64_t volatile *ptr, int64_t val) {
#endif
}
void
*
atomic_add_fetch_ptr
(
void
*
ptr
,
void
*
val
)
{
void
*
atomic_add_fetch_ptr
(
void
*
ptr
,
int64_t
val
)
{
#ifdef WINDOWS
return
interlocked_add_fetch_ptr
((
void
*
volatile
*
)(
ptr
),
(
void
*
)(
val
));
#elif defined(_TD_NINGSI_60)
...
...
@@ -618,11 +618,13 @@ int64_t atomic_sub_fetch_64(int64_t volatile *ptr, int64_t val) {
#endif
}
void
*
atomic_sub_fetch_ptr
(
void
*
ptr
,
void
*
val
)
{
void
*
atomic_sub_fetch_ptr
(
void
*
ptr
,
int64_t
val
)
{
#ifdef WINDOWS
return
interlocked_sub_fetch_ptr
(
ptr
,
val
);
#elif defined(_TD_NINGSI_60)
return
__sync_sub_and_fetch
((
ptr
),
(
val
));
#elif defined(_TD_DARWIN_64)
return
__atomic_sub_fetch
((
void
**
)(
ptr
),
(
size_t
)(
val
),
__ATOMIC_SEQ_CST
);
#else
return
__atomic_sub_fetch
((
void
**
)(
ptr
),
(
val
),
__ATOMIC_SEQ_CST
);
#endif
...
...
@@ -673,6 +675,8 @@ void* atomic_fetch_sub_ptr(void *ptr, void* val) {
return
interlocked_fetch_sub_ptr
(
ptr
,
val
);
#elif defined(_TD_NINGSI_60)
return
__sync_fetch_and_sub
((
ptr
),
(
val
));
#elif defined(_TD_DARWIN_64)
return
__atomic_fetch_sub
((
void
**
)(
ptr
),
(
size_t
)(
val
),
__ATOMIC_SEQ_CST
);
#else
return
__atomic_fetch_sub
((
void
**
)(
ptr
),
(
val
),
__ATOMIC_SEQ_CST
);
#endif
...
...
@@ -723,6 +727,8 @@ void* atomic_and_fetch_ptr(void *ptr, void *val) {
return
interlocked_and_fetch_ptr
((
void
*
volatile
*
)(
ptr
),
(
void
*
)(
val
));
#elif defined(_TD_NINGSI_60)
return
__sync_and_and_fetch
((
ptr
),
(
val
));
#elif defined(_TD_DARWIN_64)
return
(
void
*
)
__atomic_and_fetch
((
size_t
*
)(
ptr
),
(
size_t
)(
val
),
__ATOMIC_SEQ_CST
);
#else
return
__atomic_and_fetch
((
void
**
)(
ptr
),
(
val
),
__ATOMIC_SEQ_CST
);
#endif
...
...
@@ -773,6 +779,8 @@ void* atomic_fetch_and_ptr(void *ptr, void *val) {
return
interlocked_fetch_and_ptr
((
void
*
volatile
*
)(
ptr
),
(
void
*
)(
val
));
#elif defined(_TD_NINGSI_60)
return
__sync_fetch_and_and
((
ptr
),
(
val
));
#elif defined(_TD_DARWIN_64)
return
(
void
*
)
__atomic_fetch_and
((
size_t
*
)(
ptr
),
(
size_t
)(
val
),
__ATOMIC_SEQ_CST
);
#else
return
__atomic_fetch_and
((
void
**
)(
ptr
),
(
val
),
__ATOMIC_SEQ_CST
);
#endif
...
...
@@ -823,6 +831,8 @@ void* atomic_or_fetch_ptr(void *ptr, void *val) {
return
interlocked_or_fetch_ptr
((
void
*
volatile
*
)(
ptr
),
(
void
*
)(
val
));
#elif defined(_TD_NINGSI_60)
return
__sync_or_and_fetch
((
ptr
),
(
val
));
#elif defined(_TD_DARWIN_64)
return
(
void
*
)
__atomic_or_fetch
((
size_t
*
)(
ptr
),
(
size_t
)(
val
),
__ATOMIC_SEQ_CST
);
#else
return
__atomic_or_fetch
((
void
**
)(
ptr
),
(
val
),
__ATOMIC_SEQ_CST
);
#endif
...
...
@@ -873,6 +883,8 @@ void* atomic_fetch_or_ptr(void *ptr, void *val) {
return
interlocked_fetch_or_ptr
((
void
*
volatile
*
)(
ptr
),
(
void
*
)(
val
));
#elif defined(_TD_NINGSI_60)
return
__sync_fetch_and_or
((
ptr
),
(
val
));
#elif defined(_TD_DARWIN_64)
return
(
void
*
)
__atomic_fetch_or
((
size_t
*
)(
ptr
),
(
size_t
)(
val
),
__ATOMIC_SEQ_CST
);
#else
return
__atomic_fetch_or
((
void
**
)(
ptr
),
(
val
),
__ATOMIC_SEQ_CST
);
#endif
...
...
@@ -923,6 +935,8 @@ void* atomic_xor_fetch_ptr(void *ptr, void *val) {
return
interlocked_xor_fetch_ptr
((
void
*
volatile
*
)(
ptr
),
(
void
*
)(
val
));
#elif defined(_TD_NINGSI_60)
return
__sync_xor_and_fetch
((
ptr
),
(
val
));
#elif defined(_TD_DARWIN_64)
return
(
void
*
)
__atomic_xor_fetch
((
size_t
*
)(
ptr
),
(
size_t
)(
val
),
__ATOMIC_SEQ_CST
);
#else
return
__atomic_xor_fetch
((
void
**
)(
ptr
),
(
val
),
__ATOMIC_SEQ_CST
);
#endif
...
...
@@ -973,6 +987,8 @@ void* atomic_fetch_xor_ptr(void *ptr, void *val) {
return
interlocked_fetch_xor_ptr
((
void
*
volatile
*
)(
ptr
),
(
void
*
)(
val
));
#elif defined(_TD_NINGSI_60)
return
__sync_fetch_and_xor
((
ptr
),
(
val
));
#elif defined(_TD_DARWIN_64)
return
(
void
*
)
__atomic_fetch_xor
((
size_t
*
)(
ptr
),
(
size_t
)(
val
),
__ATOMIC_SEQ_CST
);
#else
return
__atomic_fetch_xor
((
void
**
)(
ptr
),
(
val
),
__ATOMIC_SEQ_CST
);
#endif
...
...
source/os/src/osFile.c
浏览文件 @
b9a1c674
...
...
@@ -35,6 +35,8 @@
#include <unistd.h>
#define LINUX_FILE_NO_TEXT_OPTION 0
#define O_TEXT LINUX_FILE_NO_TEXT_OPTION
#define _SEND_FILE_STEP_ 1000
#endif
#if defined(WINDOWS)
...
...
@@ -612,28 +614,34 @@ int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, in
#elif defined(_TD_DARWIN_64)
int
r
=
0
;
i
f
(
offset
)
{
r
=
fseek
(
in_file
,
*
offset
,
SEEK_SET
)
;
if
(
r
==
-
1
)
return
-
1
;
}
off_t
len
=
size
;
while
(
len
>
0
)
{
char
buf
[
1024
*
16
]
;
off_t
n
=
sizeof
(
buf
);
if
(
len
<
n
)
n
=
len
;
size_t
m
=
fread
(
buf
,
1
,
n
,
in_file
);
if
(
m
<
n
)
{
int
e
=
ferror
(
in_file
);
if
(
e
)
return
-
1
;
lseek
(
pFileIn
->
fd
,
(
int32_t
)(
*
offset
),
0
)
;
i
nt64_t
writeLen
=
0
;
uint8_t
buffer
[
_SEND_FILE_STEP_
]
=
{
0
}
;
for
(
int64_t
len
=
0
;
len
<
(
size
-
_SEND_FILE_STEP_
);
len
+=
_SEND_FILE_STEP_
)
{
size_t
rlen
=
read
(
pFileIn
->
fd
,
(
void
*
)
buffer
,
_SEND_FILE_STEP_
)
;
if
(
rlen
<=
0
)
{
return
writeLen
;
}
else
if
(
rlen
<
_SEND_FILE_STEP_
)
{
write
(
pFileOut
->
fd
,
(
void
*
)
buffer
,
(
uint32_t
)
rlen
)
;
return
(
int64_t
)(
writeLen
+
rlen
);
}
else
{
write
(
pFileOut
->
fd
,
(
void
*
)
buffer
,
_SEND_FILE_STEP_
);
writeLen
+=
_SEND_FILE_STEP_
;
}
if
(
m
==
0
)
break
;
if
(
m
!=
fwrite
(
buf
,
1
,
m
,
out_file
))
{
return
-
1
;
}
int64_t
remain
=
size
-
writeLen
;
if
(
remain
>
0
)
{
size_t
rlen
=
read
(
pFileIn
->
fd
,
(
void
*
)
buffer
,
(
size_t
)
remain
);
if
(
rlen
<=
0
)
{
return
writeLen
;
}
else
{
write
(
pFileOut
->
fd
,
(
void
*
)
buffer
,
(
uint32_t
)
remain
);
writeLen
+=
remain
;
}
len
-=
m
;
}
return
size
-
l
en
;
return
writeL
en
;
#else
...
...
source/os/src/osMemory.c
浏览文件 @
b9a1c674
...
...
@@ -14,7 +14,11 @@
*/
#define ALLOW_FORBID_FUNC
#ifdef _TD_DARWIN_64
#include <malloc/malloc.h>
#else
#include <malloc.h>
#endif
#include "os.h"
#if defined(USE_TD_MEMORY) || defined(USE_ADDR2LINE)
...
...
@@ -323,6 +327,8 @@ int32_t taosMemorySize(void *ptr) {
#else
#ifdef WINDOWS
return
_msize
(
ptr
);
#elif defined(_TD_DARWIN_64)
return
malloc_size
(
ptr
);
#else
return
malloc_usable_size
(
ptr
);
#endif
...
...
source/os/src/osSemaphore.c
浏览文件 @
b9a1c674
...
...
@@ -13,8 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define ALLOW_FORBID_FUNC
#define _DEFAULT_SOURCE
#include "os.h"
#include "pthread.h"
#ifdef WINDOWS
...
...
@@ -111,289 +113,501 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
// #define SEM_USE_PTHREAD
// #define SEM_USE_POSIX
#define SEM_USE_SEM
#ifdef SEM_USE_SEM
#include <mach/mach_error.h>
#include <mach/mach_init.h>
#include <mach/semaphore.h>
#include <mach/task.h>
static
TdThread
sem_thread
;
static
TdThreadOnce
sem_once
;
static
task_t
sem_port
;
static
volatile
int
sem_inited
=
0
;
static
semaphore_t
sem_exit
;
static
void
*
sem_thread_routine
(
void
*
arg
)
{
(
void
)
arg
;
setThreadName
(
"sem_thrd"
);
sem_port
=
mach_task_self
();
kern_return_t
ret
=
semaphore_create
(
sem_port
,
&
sem_exit
,
SYNC_POLICY_FIFO
,
0
);
if
(
ret
!=
KERN_SUCCESS
)
{
fprintf
(
stderr
,
"==%s[%d]%s()==failed to create sem_exit
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
);
sem_inited
=
-
1
;
return
NULL
;
}
sem_inited
=
1
;
semaphore_wait
(
sem_exit
);
return
NULL
;
// #define SEM_USE_SEM
// #ifdef SEM_USE_SEM
// #include <mach/mach_error.h>
// #include <mach/mach_init.h>
// #include <mach/semaphore.h>
// #include <mach/task.h>
// static TdThread sem_thread;
// static TdThreadOnce sem_once;
// static task_t sem_port;
// static volatile int sem_inited = 0;
// static semaphore_t sem_exit;
// static void *sem_thread_routine(void *arg) {
// (void)arg;
// setThreadName("sem_thrd");
// sem_port = mach_task_self();
// kern_return_t ret = semaphore_create(sem_port, &sem_exit, SYNC_POLICY_FIFO, 0);
// if (ret != KERN_SUCCESS) {
// fprintf(stderr, "==%s[%d]%s()==failed to create sem_exit\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__);
// sem_inited = -1;
// return NULL;
// }
// sem_inited = 1;
// semaphore_wait(sem_exit);
// return NULL;
// }
// static void once_init(void) {
// int r = 0;
// r = taosThreadCreate(&sem_thread, NULL, sem_thread_routine, NULL);
// if (r) {
// fprintf(stderr, "==%s[%d]%s()==failed to create thread\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__);
// return;
// }
// while (sem_inited == 0) {
// ;
// }
// }
// #endif
// struct tsem_s {
// #ifdef SEM_USE_PTHREAD
// TdThreadMutex lock;
// TdThreadCond cond;
// volatile int64_t val;
// #elif defined(SEM_USE_POSIX)
// size_t id;
// sem_t *sem;
// #elif defined(SEM_USE_SEM)
// semaphore_t sem;
// #else // SEM_USE_PTHREAD
// dispatch_semaphore_t sem;
// #endif // SEM_USE_PTHREAD
// volatile unsigned int valid : 1;
// };
// int tsem_init(tsem_t *sem, int pshared, unsigned int value) {
// // fprintf(stderr, "==%s[%d]%s():[%p]==creating\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// if (*sem) {
// fprintf(stderr, "==%s[%d]%s():[%p]==already initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem);
// abort();
// }
// struct tsem_s *p = (struct tsem_s *)taosMemoryCalloc(1, sizeof(*p));
// if (!p) {
// fprintf(stderr, "==%s[%d]%s():[%p]==out of memory\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// abort();
// }
// #ifdef SEM_USE_PTHREAD
// int r = taosThreadMutexInit(&p->lock, NULL);
// do {
// if (r) break;
// r = taosThreadCondInit(&p->cond, NULL);
// if (r) {
// taosThreadMutexDestroy(&p->lock);
// break;
// }
// p->val = value;
// } while (0);
// if (r) {
// fprintf(stderr, "==%s[%d]%s():[%p]==not created\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// abort();
// }
// #elif defined(SEM_USE_POSIX)
// static size_t tick = 0;
// do {
// size_t id = atomic_add_fetch_64(&tick, 1);
// if (id == SEM_VALUE_MAX) {
// atomic_store_64(&tick, 0);
// id = 0;
// }
// char name[NAME_MAX - 4];
// snprintf(name, sizeof(name), "/t%ld", id);
// p->sem = sem_open(name, O_CREAT | O_EXCL, pshared, value);
// p->id = id;
// if (p->sem != SEM_FAILED) break;
// int e = errno;
// if (e == EEXIST) continue;
// if (e == EINTR) continue;
// fprintf(stderr, "==%s[%d]%s():[%p]==not created[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem,
// e, strerror(e));
// abort();
// } while (p->sem == SEM_FAILED);
// #elif defined(SEM_USE_SEM)
// taosThreadOnce(&sem_once, once_init);
// if (sem_inited != 1) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal resource init failed\n", taosDirEntryBaseName(__FILE__), __LINE__,
// __func__, sem);
// errno = ENOMEM;
// return -1;
// }
// kern_return_t ret = semaphore_create(sem_port, &p->sem, SYNC_POLICY_FIFO, value);
// if (ret != KERN_SUCCESS) {
// fprintf(stderr, "==%s[%d]%s():[%p]==semophore_create failed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem);
// // we fail-fast here, because we have less-doc about semaphore_create for the moment
// abort();
// }
// #else // SEM_USE_PTHREAD
// p->sem = dispatch_semaphore_create(value);
// if (p->sem == NULL) {
// fprintf(stderr, "==%s[%d]%s():[%p]==not created\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// abort();
// }
// #endif // SEM_USE_PTHREAD
// p->valid = 1;
// *sem = p;
// return 0;
// }
// int tsem_wait(tsem_t *sem) {
// if (!*sem) {
// fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// abort();
// }
// struct tsem_s *p = *sem;
// if (!p->valid) {
// fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// abort();
// }
// #ifdef SEM_USE_PTHREAD
// if (taosThreadMutexLock(&p->lock)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem);
// abort();
// }
// p->val -= 1;
// if (p->val < 0) {
// if (taosThreadCondWait(&p->cond, &p->lock)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem);
// abort();
// }
// }
// if (taosThreadMutexUnlock(&p->lock)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem);
// abort();
// }
// return 0;
// #elif defined(SEM_USE_POSIX)
// return sem_wait(p->sem);
// #elif defined(SEM_USE_SEM)
// return semaphore_wait(p->sem);
// #else // SEM_USE_PTHREAD
// return dispatch_semaphore_wait(p->sem, DISPATCH_TIME_FOREVER);
// #endif // SEM_USE_PTHREAD
// }
// int tsem_post(tsem_t *sem) {
// if (!*sem) {
// fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// abort();
// }
// struct tsem_s *p = *sem;
// if (!p->valid) {
// fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// abort();
// }
// #ifdef SEM_USE_PTHREAD
// if (taosThreadMutexLock(&p->lock)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem);
// abort();
// }
// p->val += 1;
// if (p->val <= 0) {
// if (taosThreadCondSignal(&p->cond)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem);
// abort();
// }
// }
// if (taosThreadMutexUnlock(&p->lock)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem);
// abort();
// }
// return 0;
// #elif defined(SEM_USE_POSIX)
// return sem_post(p->sem);
// #elif defined(SEM_USE_SEM)
// return semaphore_signal(p->sem);
// #else // SEM_USE_PTHREAD
// return dispatch_semaphore_signal(p->sem);
// #endif // SEM_USE_PTHREAD
// }
// int tsem_destroy(tsem_t *sem) {
// // fprintf(stderr, "==%s[%d]%s():[%p]==destroying\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// if (!*sem) {
// // fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// // abort();
// return 0;
// }
// struct tsem_s *p = *sem;
// if (!p->valid) {
// // fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// // sem); abort();
// return 0;
// }
// #ifdef SEM_USE_PTHREAD
// if (taosThreadMutexLock(&p->lock)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem);
// abort();
// }
// p->valid = 0;
// if (taosThreadCondDestroy(&p->cond)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem);
// abort();
// }
// if (taosThreadMutexUnlock(&p->lock)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem);
// abort();
// }
// if (taosThreadMutexDestroy(&p->lock)) {
// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem);
// abort();
// }
// #elif defined(SEM_USE_POSIX)
// char name[NAME_MAX - 4];
// snprintf(name, sizeof(name), "/t%ld", p->id);
// int r = sem_unlink(name);
// if (r) {
// int e = errno;
// fprintf(stderr, "==%s[%d]%s():[%p]==unlink failed[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem,
// e, strerror(e));
// abort();
// }
// #elif defined(SEM_USE_SEM)
// semaphore_destroy(sem_port, p->sem);
// #else // SEM_USE_PTHREAD
// #endif // SEM_USE_PTHREAD
// p->valid = 0;
// taosMemoryFree(p);
// *sem = NULL;
// return 0;
// }
typedef
struct
{
pthread_mutex_t
count_lock
;
pthread_cond_t
count_bump
;
unsigned
int
count
;
}
bosal_sem_t
;
int
tsem_init
(
tsem_t
*
psem
,
int
flags
,
unsigned
int
count
)
{
bosal_sem_t
*
pnewsem
;
int
result
;
pnewsem
=
(
bosal_sem_t
*
)
malloc
(
sizeof
(
bosal_sem_t
));
if
(
!
pnewsem
)
{
return
-
1
;
}
result
=
pthread_mutex_init
(
&
pnewsem
->
count_lock
,
NULL
);
if
(
result
)
{
free
(
pnewsem
);
return
result
;
}
result
=
pthread_cond_init
(
&
pnewsem
->
count_bump
,
NULL
);
if
(
result
)
{
pthread_mutex_destroy
(
&
pnewsem
->
count_lock
);
free
(
pnewsem
);
return
result
;
}
pnewsem
->
count
=
count
;
*
psem
=
(
tsem_t
)
pnewsem
;
return
0
;
}
static
void
once_init
(
void
)
{
int
r
=
0
;
r
=
taosThreadCreate
(
&
sem_thread
,
NULL
,
sem_thread_routine
,
NULL
);
if
(
r
)
{
fprintf
(
stderr
,
"==%s[%d]%s()==failed to create thread
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
);
return
;
}
while
(
sem_inited
==
0
)
{
;
}
int
tsem_destroy
(
tsem_t
*
psem
)
{
bosal_sem_t
*
poldsem
;
if
(
!
psem
)
{
return
EINVAL
;
}
poldsem
=
(
bosal_sem_t
*
)
*
psem
;
pthread_mutex_destroy
(
&
poldsem
->
count_lock
);
pthread_cond_destroy
(
&
poldsem
->
count_bump
);
free
(
poldsem
);
return
0
;
}
#endif
struct
tsem_s
{
#ifdef SEM_USE_PTHREAD
TdThreadMutex
lock
;
TdThreadCond
cond
;
volatile
int64_t
val
;
#elif defined(SEM_USE_POSIX)
size_t
id
;
sem_t
*
sem
;
#elif defined(SEM_USE_SEM)
semaphore_t
sem
;
#else // SEM_USE_PTHREAD
dispatch_semaphore_t
sem
;
#endif // SEM_USE_PTHREAD
volatile
unsigned
int
valid
:
1
;
};
int
tsem_init
(
tsem_t
*
sem
,
int
pshared
,
unsigned
int
value
)
{
// fprintf(stderr, "==%s[%d]%s():[%p]==creating\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
if
(
*
sem
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==already initialized
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
struct
tsem_s
*
p
=
(
struct
tsem_s
*
)
taosMemoryCalloc
(
1
,
sizeof
(
*
p
));
if
(
!
p
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==out of memory
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
int
tsem_post
(
tsem_t
*
psem
)
{
bosal_sem_t
*
pxsem
;
int
result
,
xresult
;
#ifdef SEM_USE_PTHREAD
int
r
=
taosThreadMutexInit
(
&
p
->
lock
,
NULL
);
do
{
if
(
r
)
break
;
r
=
taosThreadCondInit
(
&
p
->
cond
,
NULL
);
if
(
r
)
{
taosThreadMutexDestroy
(
&
p
->
lock
);
break
;
}
p
->
val
=
value
;
}
while
(
0
);
if
(
r
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==not created
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
#elif defined(SEM_USE_POSIX)
static
size_t
tick
=
0
;
do
{
size_t
id
=
atomic_add_fetch_64
(
&
tick
,
1
);
if
(
id
==
SEM_VALUE_MAX
)
{
atomic_store_64
(
&
tick
,
0
);
id
=
0
;
if
(
!
psem
)
{
return
EINVAL
;
}
char
name
[
NAME_MAX
-
4
];
snprintf
(
name
,
sizeof
(
name
),
"/t%ld"
,
id
);
p
->
sem
=
sem_open
(
name
,
O_CREAT
|
O_EXCL
,
pshared
,
value
);
p
->
id
=
id
;
if
(
p
->
sem
!=
SEM_FAILED
)
break
;
int
e
=
errno
;
if
(
e
==
EEXIST
)
continue
;
if
(
e
==
EINTR
)
continue
;
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==not created[%d]%s
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
,
e
,
strerror
(
e
));
abort
();
}
while
(
p
->
sem
==
SEM_FAILED
);
#elif defined(SEM_USE_SEM)
taosThreadOnce
(
&
sem_once
,
once_init
);
if
(
sem_inited
!=
1
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal resource init failed
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
errno
=
ENOMEM
;
return
-
1
;
}
kern_return_t
ret
=
semaphore_create
(
sem_port
,
&
p
->
sem
,
SYNC_POLICY_FIFO
,
value
);
if
(
ret
!=
KERN_SUCCESS
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==semophore_create failed
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
// we fail-fast here, because we have less-doc about semaphore_create for the moment
abort
();
}
#else // SEM_USE_PTHREAD
p
->
sem
=
dispatch_semaphore_create
(
value
);
if
(
p
->
sem
==
NULL
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==not created
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
#endif // SEM_USE_PTHREAD
pxsem
=
(
bosal_sem_t
*
)
*
psem
;
p
->
valid
=
1
;
result
=
pthread_mutex_lock
(
&
pxsem
->
count_lock
);
if
(
result
)
{
return
result
;
}
pxsem
->
count
=
pxsem
->
count
+
1
;
*
sem
=
p
;
xresult
=
pthread_cond_signal
(
&
pxsem
->
count_bump
)
;
return
0
;
result
=
pthread_mutex_unlock
(
&
pxsem
->
count_lock
);
if
(
result
)
{
return
result
;
}
if
(
xresult
)
{
errno
=
xresult
;
return
-
1
;
}
return
0
;
}
int
tsem_wait
(
tsem_t
*
sem
)
{
if
(
!*
sem
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==not initialized
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
struct
tsem_s
*
p
=
*
sem
;
if
(
!
p
->
valid
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==already destroyed
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
#ifdef SEM_USE_PTHREAD
if
(
taosThreadMutexLock
(
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
p
->
val
-=
1
;
if
(
p
->
val
<
0
)
{
if
(
taosThreadCondWait
(
&
p
->
cond
,
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
int
tsem_trywait
(
tsem_t
*
psem
)
{
bosal_sem_t
*
pxsem
;
int
result
,
xresult
;
if
(
!
psem
)
{
return
EINVAL
;
}
}
if
(
taosThreadMutexUnlock
(
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
return
0
;
#elif defined(SEM_USE_POSIX)
return
sem_wait
(
p
->
sem
);
#elif defined(SEM_USE_SEM)
return
semaphore_wait
(
p
->
sem
);
#else // SEM_USE_PTHREAD
return
dispatch_semaphore_wait
(
p
->
sem
,
DISPATCH_TIME_FOREVER
);
#endif // SEM_USE_PTHREAD
}
pxsem
=
(
bosal_sem_t
*
)
*
psem
;
int
tsem_post
(
tsem_t
*
sem
)
{
if
(
!*
sem
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==not initialized
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
struct
tsem_s
*
p
=
*
sem
;
if
(
!
p
->
valid
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==already destroyed
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
#ifdef SEM_USE_PTHREAD
if
(
taosThreadMutexLock
(
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
p
->
val
+=
1
;
if
(
p
->
val
<=
0
)
{
if
(
taosThreadCondSignal
(
&
p
->
cond
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
result
=
pthread_mutex_lock
(
&
pxsem
->
count_lock
);
if
(
result
)
{
return
result
;
}
}
if
(
taosThreadMutexUnlock
(
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
return
0
;
#elif defined(SEM_USE_POSIX)
return
sem_post
(
p
->
sem
);
#elif defined(SEM_USE_SEM)
return
semaphore_signal
(
p
->
sem
);
#else // SEM_USE_PTHREAD
return
dispatch_semaphore_signal
(
p
->
sem
);
#endif // SEM_USE_PTHREAD
}
xresult
=
0
;
int
tsem_destroy
(
tsem_t
*
sem
)
{
// fprintf(stderr, "==%s[%d]%s():[%p]==destroying\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
if
(
!*
sem
)
{
// fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem);
// abort();
return
0
;
}
struct
tsem_s
*
p
=
*
sem
;
if
(
!
p
->
valid
)
{
// fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__,
// sem); abort();
if
(
pxsem
->
count
>
0
)
{
pxsem
->
count
--
;
}
else
{
xresult
=
EAGAIN
;
}
result
=
pthread_mutex_unlock
(
&
pxsem
->
count_lock
);
if
(
result
)
{
return
result
;
}
if
(
xresult
)
{
errno
=
xresult
;
return
-
1
;
}
return
0
;
}
#ifdef SEM_USE_PTHREAD
if
(
taosThreadMutexLock
(
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
p
->
valid
=
0
;
if
(
taosThreadCondDestroy
(
&
p
->
cond
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
if
(
taosThreadMutexUnlock
(
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
if
(
taosThreadMutexDestroy
(
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
#elif defined(SEM_USE_POSIX)
char
name
[
NAME_MAX
-
4
];
snprintf
(
name
,
sizeof
(
name
),
"/t%ld"
,
p
->
id
);
int
r
=
sem_unlink
(
name
);
if
(
r
)
{
int
e
=
errno
;
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==unlink failed[%d]%s
\n
"
,
taosDirEntryBaseName
(
__FILE__
),
__LINE__
,
__func__
,
sem
,
e
,
strerror
(
e
));
abort
();
}
#elif defined(SEM_USE_SEM)
semaphore_destroy
(
sem_port
,
p
->
sem
);
#else // SEM_USE_PTHREAD
#endif // SEM_USE_PTHREAD
}
p
->
valid
=
0
;
taosMemoryFree
(
p
);
int
tsem_wait
(
tsem_t
*
psem
)
{
bosal_sem_t
*
pxsem
;
int
result
,
xresult
;
*
sem
=
NULL
;
return
0
;
if
(
!
psem
)
{
return
EINVAL
;
}
pxsem
=
(
bosal_sem_t
*
)
*
psem
;
result
=
pthread_mutex_lock
(
&
pxsem
->
count_lock
);
if
(
result
)
{
return
result
;
}
xresult
=
0
;
if
(
pxsem
->
count
==
0
)
{
xresult
=
pthread_cond_wait
(
&
pxsem
->
count_bump
,
&
pxsem
->
count_lock
);
}
if
(
!
xresult
)
{
if
(
pxsem
->
count
>
0
)
{
pxsem
->
count
--
;
}
}
result
=
pthread_mutex_unlock
(
&
pxsem
->
count_lock
);
if
(
result
)
{
return
result
;
}
if
(
xresult
)
{
errno
=
xresult
;
return
-
1
;
}
return
0
;
}
bool
taosCheckPthreadValid
(
TdThread
thread
)
{
uint64_t
id
=
0
;
int
r
=
TdThreadhreadid_np
(
thread
,
&
id
);
return
r
?
false
:
true
;
int
tsem_timewait
(
tsem_t
*
psem
,
int64_t
nanosecs
)
{
struct
timespec
abstim
=
{
.
tv_sec
=
0
,
.
tv_nsec
=
nanosecs
,
};
bosal_sem_t
*
pxsem
;
int
result
,
xresult
;
if
(
!
psem
)
{
return
EINVAL
;
}
pxsem
=
(
bosal_sem_t
*
)
*
psem
;
result
=
pthread_mutex_lock
(
&
pxsem
->
count_lock
);
if
(
result
)
{
return
result
;
}
xresult
=
0
;
if
(
pxsem
->
count
==
0
)
{
xresult
=
pthread_cond_timedwait
(
&
pxsem
->
count_bump
,
&
pxsem
->
count_lock
,
&
abstim
);
}
if
(
!
xresult
)
{
if
(
pxsem
->
count
>
0
)
{
pxsem
->
count
--
;
}
}
result
=
pthread_mutex_unlock
(
&
pxsem
->
count_lock
);
if
(
result
)
{
return
result
;
}
if
(
xresult
)
{
errno
=
xresult
;
return
-
1
;
}
return
0
;
}
bool
taosCheckPthreadValid
(
TdThread
thread
)
{
int32_t
ret
=
taosThreadKill
(
thread
,
0
);
if
(
ret
==
ESRCH
)
return
false
;
if
(
ret
==
EINVAL
)
return
false
;
// alive
return
true
;
}
int64_t
taosGetSelfPthreadId
()
{
uint64_t
id
;
TdThreadhreadid_np
(
0
,
&
id
);
return
(
int64_t
)
id
;
TdThread
thread
=
taosThreadSelf
();
return
(
int64_t
)
thread
;
}
int64_t
taosGetPthreadId
(
TdThread
thread
)
{
return
(
int64_t
)
thread
;
}
...
...
source/os/src/osSignal.c
浏览文件 @
b9a1c674
...
...
@@ -73,6 +73,10 @@ void taosIgnSignal(int32_t signum) { signal(signum, SIG_IGN); }
void
taosDflSignal
(
int32_t
signum
)
{
signal
(
signum
,
SIG_DFL
);
}
void
taosKillChildOnParentStopped
()
{
prctl
(
PR_SET_PDEATHSIG
,
SIGKILL
);
}
void
taosKillChildOnParentStopped
()
{
#ifndef _TD_DARWIN_64
prctl
(
PR_SET_PDEATHSIG
,
SIGKILL
);
#endif
}
#endif
source/os/src/osSocket.c
浏览文件 @
b9a1c674
...
...
@@ -49,6 +49,14 @@
#define INVALID_SOCKET -1
#endif
typedef
struct
TdSocket
{
#if SOCKET_WITH_LOCK
TdThreadRwlock
rwlock
;
#endif
int
refId
;
SocketFd
fd
;
}
*
TdSocketPtr
,
TdSocket
;
typedef
struct
TdSocketServer
{
#if SOCKET_WITH_LOCK
TdThreadRwlock
rwlock
;
...
...
@@ -1029,60 +1037,6 @@ int32_t taosGetSocketName(TdSocketPtr pSocket, struct sockaddr *destAddr, int *a
return
getsockname
(
pSocket
->
fd
,
destAddr
,
addrLen
);
}
TdEpollPtr
taosCreateEpoll
(
int32_t
size
)
{
EpollFd
fd
=
-
1
;
#ifdef WINDOWS
assert
(
0
);
#else
fd
=
epoll_create
(
size
);
#endif
if
(
fd
<
0
)
{
return
NULL
;
}
TdEpollPtr
pEpoll
=
(
TdEpollPtr
)
taosMemoryMalloc
(
sizeof
(
TdEpoll
));
if
(
pEpoll
==
NULL
)
{
taosCloseSocketNoCheck1
(
fd
);
return
NULL
;
}
pEpoll
->
fd
=
fd
;
pEpoll
->
refId
=
0
;
return
pEpoll
;
}
int32_t
taosCtlEpoll
(
TdEpollPtr
pEpoll
,
int32_t
epollOperate
,
TdSocketPtr
pSocket
,
struct
epoll_event
*
event
)
{
int32_t
code
=
-
1
;
if
(
pEpoll
==
NULL
||
pEpoll
->
fd
<
0
)
{
return
-
1
;
}
#ifdef WINDOWS
assert
(
0
);
#else
code
=
epoll_ctl
(
pEpoll
->
fd
,
epollOperate
,
pSocket
->
fd
,
event
);
#endif
return
code
;
}
int32_t
taosWaitEpoll
(
TdEpollPtr
pEpoll
,
struct
epoll_event
*
event
,
int32_t
maxEvents
,
int32_t
timeout
)
{
int32_t
code
=
-
1
;
if
(
pEpoll
==
NULL
||
pEpoll
->
fd
<
0
)
{
return
-
1
;
}
#ifdef WINDOWS
assert
(
0
);
#else
code
=
epoll_wait
(
pEpoll
->
fd
,
event
,
maxEvents
,
timeout
);
#endif
return
code
;
}
int32_t
taosCloseEpoll
(
TdEpollPtr
*
ppEpoll
)
{
int32_t
code
;
if
(
ppEpoll
==
NULL
||
*
ppEpoll
==
NULL
||
(
*
ppEpoll
)
->
fd
<
0
)
{
return
-
1
;
}
code
=
taosCloseSocketNoCheck1
((
*
ppEpoll
)
->
fd
);
(
*
ppEpoll
)
->
fd
=
-
1
;
taosMemoryFree
(
*
ppEpoll
);
return
code
;
}
/*
* Set TCP connection timeout per-socket level.
* ref [https://github.com/libuv/help/issues/54]
...
...
@@ -1100,6 +1054,11 @@ int32_t taosCreateSocketWithTimeout(uint32_t timeout) {
if
(
0
!=
setsockopt
(
fd
,
IPPROTO_TCP
,
TCP_MAXRT
,
(
char
*
)
&
timeout
,
sizeof
(
timeout
)))
{
return
-
1
;
}
#elif defined(_TD_DARWIN_64)
uint32_t
conn_timeout_ms
=
timeout
*
1000
;
if
(
0
!=
setsockopt
(
fd
,
IPPROTO_TCP
,
TCP_CONNECTIONTIMEOUT
,
(
char
*
)
&
conn_timeout_ms
,
sizeof
(
conn_timeout_ms
)))
{
return
-
1
;
}
#else // Linux like systems
uint32_t
conn_timeout_ms
=
timeout
*
1000
;
if
(
0
!=
setsockopt
(
fd
,
IPPROTO_TCP
,
TCP_USER_TIMEOUT
,
(
char
*
)
&
conn_timeout_ms
,
sizeof
(
conn_timeout_ms
)))
{
...
...
source/os/src/osThread.c
浏览文件 @
b9a1c674
...
...
@@ -157,9 +157,9 @@ int32_t taosThreadKill(TdThread thread, int32_t sig) {
return
pthread_kill
(
thread
,
sig
);
}
int32_t
taosThreadMutexConsistent
(
TdThreadMutex
*
mutex
)
{
return
pthread_mutex_consistent
(
mutex
);
}
//
int32_t taosThreadMutexConsistent(TdThreadMutex* mutex) {
//
return pthread_mutex_consistent(mutex);
//
}
int32_t
taosThreadMutexDestroy
(
TdThreadMutex
*
mutex
)
{
return
pthread_mutex_destroy
(
mutex
);
...
...
@@ -173,9 +173,9 @@ int32_t taosThreadMutexLock(TdThreadMutex * mutex) {
return
pthread_mutex_lock
(
mutex
);
}
int32_t
taosThreadMutexTimedLock
(
TdThreadMutex
*
mutex
,
const
struct
timespec
*
abstime
)
{
return
pthread_mutex_timedlock
(
mutex
,
abstime
);
}
//
int32_t taosThreadMutexTimedLock(TdThreadMutex * mutex, const struct timespec *abstime) {
//
return pthread_mutex_timedlock(mutex, abstime);
//
}
int32_t
taosThreadMutexTryLock
(
TdThreadMutex
*
mutex
)
{
return
pthread_mutex_trylock
(
mutex
);
...
...
@@ -193,9 +193,9 @@ int32_t taosThreadMutexAttrGetPshared(const TdThreadMutexAttr * attr, int32_t *p
return
pthread_mutexattr_getpshared
(
attr
,
pshared
);
}
int32_t
taosThreadMutexAttrGetRobust
(
const
TdThreadMutexAttr
*
attr
,
int32_t
*
robust
)
{
return
pthread_mutexattr_getrobust
(
attr
,
robust
);
}
//
int32_t taosThreadMutexAttrGetRobust(const TdThreadMutexAttr * attr, int32_t * robust) {
//
return pthread_mutexattr_getrobust(attr, robust);
//
}
int32_t
taosThreadMutexAttrGetType
(
const
TdThreadMutexAttr
*
attr
,
int32_t
*
kind
)
{
return
pthread_mutexattr_gettype
(
attr
,
kind
);
...
...
@@ -209,9 +209,9 @@ int32_t taosThreadMutexAttrSetPshared(TdThreadMutexAttr * attr, int32_t pshared)
return
pthread_mutexattr_setpshared
(
attr
,
pshared
);
}
int32_t
taosThreadMutexAttrSetRobust
(
TdThreadMutexAttr
*
attr
,
int32_t
robust
)
{
return
pthread_mutexattr_setrobust
(
attr
,
robust
);
}
//
int32_t taosThreadMutexAttrSetRobust(TdThreadMutexAttr * attr, int32_t robust) {
//
return pthread_mutexattr_setrobust(attr, robust);
//
}
int32_t
taosThreadMutexAttrSetType
(
TdThreadMutexAttr
*
attr
,
int32_t
kind
)
{
return
pthread_mutexattr_settype
(
attr
,
kind
);
...
...
@@ -233,13 +233,13 @@ int32_t taosThreadRwlockRdlock(TdThreadRwlock * rwlock) {
return
pthread_rwlock_rdlock
(
rwlock
);
}
int32_t
taosThreadRwlockTimedRdlock
(
TdThreadRwlock
*
rwlock
,
const
struct
timespec
*
abstime
)
{
return
pthread_rwlock_timedrdlock
(
rwlock
,
abstime
);
}
//
int32_t taosThreadRwlockTimedRdlock(TdThreadRwlock * rwlock, const struct timespec *abstime) {
//
return pthread_rwlock_timedrdlock(rwlock, abstime);
//
}
int32_t
taosThreadRwlockTimedWrlock
(
TdThreadRwlock
*
rwlock
,
const
struct
timespec
*
abstime
)
{
return
pthread_rwlock_timedwrlock
(
rwlock
,
abstime
);
}
//
int32_t taosThreadRwlockTimedWrlock(TdThreadRwlock * rwlock, const struct timespec *abstime) {
//
return pthread_rwlock_timedwrlock(rwlock, abstime);
//
}
int32_t
taosThreadRwlockTryRdlock
(
TdThreadRwlock
*
rwlock
)
{
return
pthread_rwlock_tryrdlock
(
rwlock
);
...
...
@@ -303,7 +303,7 @@ int32_t taosThreadSpinDestroy(TdThreadSpinlock * lock) {
int32_t
taosThreadSpinInit
(
TdThreadSpinlock
*
lock
,
int32_t
pshared
)
{
#ifdef TD_USE_SPINLOCK_AS_MUTEX
assert
(
pshared
==
NULL
);
assert
(
pshared
==
0
);
return
pthread_mutex_init
((
pthread_mutex_t
*
)
lock
,
NULL
);
#else
return
pthread_spin_init
((
pthread_spinlock_t
*
)
lock
,
pshared
);
...
...
source/util/src/tcache.c
浏览文件 @
b9a1c674
...
...
@@ -417,8 +417,8 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v
if
(
pNode
==
NULL
)
{
pushfrontNodeInEntryList
(
pe
,
pNode1
);
atomic_add_fetch_
64
(
&
pCacheObj
->
numOfElems
,
1
);
atomic_add_fetch_
64
(
&
pCacheObj
->
sizeInBytes
,
pNode1
->
size
);
atomic_add_fetch_
ptr
(
&
pCacheObj
->
numOfElems
,
1
);
atomic_add_fetch_
ptr
(
&
pCacheObj
->
sizeInBytes
,
pNode1
->
size
);
uDebug
(
"cache:%s, key:%p, %p added into cache, added:%"
PRIu64
", expire:%"
PRIu64
", totalNum:%d sizeInBytes:%"
PRId64
"bytes size:%"
PRId64
"bytes"
,
pCacheObj
->
name
,
key
,
pNode1
->
data
,
pNode1
->
addedTime
,
pNode1
->
expireTime
,
(
int32_t
)
pCacheObj
->
numOfElems
,
...
...
@@ -667,7 +667,7 @@ void doTraverseElems(SCacheObj *pCacheObj, bool (*fp)(void *param, SCacheNode *p
pEntry
->
next
=
next
;
pEntry
->
num
-=
1
;
atomic_sub_fetch_
64
(
&
pCacheObj
->
numOfElems
,
1
);
atomic_sub_fetch_
ptr
(
&
pCacheObj
->
numOfElems
,
1
);
pNode
=
next
;
}
}
...
...
source/util/src/ttimer.c
浏览文件 @
b9a1c674
...
...
@@ -132,7 +132,7 @@ static timer_map_t timerMap;
static
uintptr_t
getNextTimerId
()
{
uintptr_t
id
;
do
{
id
=
(
uintptr_t
)
atomic_add_fetch_ptr
((
void
**
)
&
nextTimerId
,
(
void
*
)
1
);
id
=
(
uintptr_t
)
atomic_add_fetch_ptr
((
void
**
)
&
nextTimerId
,
1
);
}
while
(
id
==
0
);
return
id
;
}
...
...
tools/shell/src/shellArguments.c
浏览文件 @
b9a1c674
...
...
@@ -13,6 +13,10 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifdef _TD_DARWIN_64
#include <pwd.h>
#endif
#include "shellInt.h"
#define SHELL_HOST "The auth string to use when connecting to the server."
...
...
tools/shell/src/shellEngine.c
浏览文件 @
b9a1c674
...
...
@@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define ALLOW_FORBID_FUNC
#define _BSD_SOURCE
#define _GNU_SOURCE
#define _XOPEN_SOURCE
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录