Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
27351366
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
27351366
编写于
1月 25, 2021
作者:
S
slguan
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'freemine/mac' into feature/mac
上级
2130b01b
15b7973e
变更
54
显示空白变更内容
内联
并排
Showing
54 changed file
with
2105 addition
and
58 deletion
+2105
-58
cmake/define.inc
cmake/define.inc
+2
-0
deps/CMakeLists.txt
deps/CMakeLists.txt
+5
-1
packaging/tools/install_client.sh
packaging/tools/install_client.sh
+1
-1
packaging/tools/make_install.sh
packaging/tools/make_install.sh
+9
-5
src/balance/CMakeLists.txt
src/balance/CMakeLists.txt
+4
-0
src/client/CMakeLists.txt
src/client/CMakeLists.txt
+24
-2
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+2
-0
src/client/src/tscSql.c
src/client/src/tscSql.c
+4
-0
src/client/src/tscUtil.c
src/client/src/tscUtil.c
+8
-0
src/cq/CMakeLists.txt
src/cq/CMakeLists.txt
+11
-1
src/dnode/CMakeLists.txt
src/dnode/CMakeLists.txt
+40
-1
src/dnode/src/dnodeSystem.c
src/dnode/src/dnodeSystem.c
+4
-0
src/dnode/src/dnodeTelemetry.c
src/dnode/src/dnodeTelemetry.c
+7
-0
src/kit/shell/CMakeLists.txt
src/kit/shell/CMakeLists.txt
+8
-5
src/kit/shell/src/shellDarwin.c
src/kit/shell/src/shellDarwin.c
+5
-0
src/kit/taosdemo/CMakeLists.txt
src/kit/taosdemo/CMakeLists.txt
+10
-1
src/kit/taosdemox/CMakeLists.txt
src/kit/taosdemox/CMakeLists.txt
+28
-6
src/kit/taosdump/CMakeLists.txt
src/kit/taosdump/CMakeLists.txt
+11
-1
src/mnode/CMakeLists.txt
src/mnode/CMakeLists.txt
+12
-1
src/mnode/src/mnodeCluster.c
src/mnode/src/mnodeCluster.c
+8
-0
src/os/inc/eok.h
src/os/inc/eok.h
+93
-0
src/os/inc/osDarwin.h
src/os/inc/osDarwin.h
+17
-6
src/os/inc/osDef.h
src/os/inc/osDef.h
+3
-2
src/os/src/darwin/darwinEnv.c
src/os/src/darwin/darwinEnv.c
+1
-0
src/os/src/darwin/darwinSemphone.c
src/os/src/darwin/darwinSemphone.c
+251
-10
src/os/src/darwin/darwinTimer.c
src/os/src/darwin/darwinTimer.c
+87
-0
src/os/src/darwin/eok.c
src/os/src/darwin/eok.c
+893
-0
src/os/src/detail/CMakeLists.txt
src/os/src/detail/CMakeLists.txt
+1
-0
src/os/src/detail/osSocket.c
src/os/src/detail/osSocket.c
+1
-1
src/os/src/detail/osTimer.c
src/os/src/detail/osTimer.c
+5
-1
src/plugins/http/CMakeLists.txt
src/plugins/http/CMakeLists.txt
+17
-2
src/plugins/http/src/httpServer.c
src/plugins/http/src/httpServer.c
+50
-0
src/plugins/monitor/CMakeLists.txt
src/plugins/monitor/CMakeLists.txt
+12
-2
src/plugins/mqtt/CMakeLists.txt
src/plugins/mqtt/CMakeLists.txt
+12
-0
src/query/CMakeLists.txt
src/query/CMakeLists.txt
+5
-0
src/rpc/src/rpcTcp.c
src/rpc/src/rpcTcp.c
+18
-1
src/rpc/test/CMakeLists.txt
src/rpc/test/CMakeLists.txt
+14
-0
src/sync/CMakeLists.txt
src/sync/CMakeLists.txt
+14
-1
src/sync/src/syncRetrieve.c
src/sync/src/syncRetrieve.c
+1
-1
src/sync/src/syncTcp.c
src/sync/src/syncTcp.c
+8
-0
src/sync/test/CMakeLists.txt
src/sync/test/CMakeLists.txt
+0
-1
src/tsdb/inc/tsdbMain.h
src/tsdb/inc/tsdbMain.h
+5
-1
src/tsdb/src/tsdbCommit.c
src/tsdb/src/tsdbCommit.c
+4
-0
src/tsdb/src/tsdbMain.c
src/tsdb/src/tsdbMain.c
+18
-0
src/tsdb/src/tsdbMemTable.c
src/tsdb/src/tsdbMemTable.c
+10
-1
src/util/CMakeLists.txt
src/util/CMakeLists.txt
+2
-1
src/util/src/tsocket.c
src/util/src/tsocket.c
+3
-0
src/vnode/CMakeLists.txt
src/vnode/CMakeLists.txt
+6
-0
src/wal/CMakeLists.txt
src/wal/CMakeLists.txt
+7
-1
src/wal/test/CMakeLists.txt
src/wal/test/CMakeLists.txt
+8
-0
tests/comparisonTest/tdengine/CMakeLists.txt
tests/comparisonTest/tdengine/CMakeLists.txt
+5
-0
tests/examples/c/CMakeLists.txt
tests/examples/c/CMakeLists.txt
+10
-0
tests/examples/c/demo.c
tests/examples/c/demo.c
+1
-1
tests/examples/c/epoll.c
tests/examples/c/epoll.c
+320
-0
未找到文件。
cmake/define.inc
浏览文件 @
27351366
...
...
@@ -128,6 +128,8 @@ IF (TD_DARWIN_64)
SET
(
COMMON_FLAGS
"-std=gnu99 -Wall -Werror -Wno-missing-braces -fPIC -msse4.2 -D_FILE_OFFSET_BITS=64 -D_LARGE_FILE"
)
SET
(
DEBUG_FLAGS
"-O0 -g3 -DDEBUG"
)
SET
(
RELEASE_FLAGS
"-Og"
)
INCLUDE_DIRECTORIES
(
$
{
TD_COMMUNITY_DIR
}
/
deps
/
cJson
/
inc
)
INCLUDE_DIRECTORIES
(
$
{
TD_COMMUNITY_DIR
}
/
deps
/
lz4
/
inc
)
ENDIF
()
IF
(
TD_WINDOWS
)
...
...
deps/CMakeLists.txt
浏览文件 @
27351366
...
...
@@ -13,3 +13,7 @@ ADD_SUBDIRECTORY(MsvcLibX)
IF
(
TD_LINUX AND TD_MQTT
)
ADD_SUBDIRECTORY
(
MQTT-C
)
ENDIF
()
IF
(
TD_DARWIN AND TD_MQTT
)
ADD_SUBDIRECTORY
(
MQTT-C
)
ENDIF
()
packaging/tools/install_client.sh
浏览文件 @
27351366
...
...
@@ -21,7 +21,7 @@ else
cd
${
script_dir
}
script_dir
=
"
$(
pwd
)
"
data_dir
=
"/var/lib/taos"
log_dir
=
"~/TDengineLog"
log_dir
=
~/TDengineLog
fi
log_link_dir
=
"/usr/local/taos/log"
...
...
packaging/tools/make_install.sh
浏览文件 @
27351366
...
...
@@ -24,7 +24,7 @@ data_dir="/var/lib/taos"
if
[
"
$osType
"
!=
"Darwin"
]
;
then
log_dir
=
"/var/log/taos"
else
log_dir
=
"~/TDengineLog"
log_dir
=
~/TDengineLog
fi
data_link_dir
=
"/usr/local/taos/data"
...
...
@@ -178,7 +178,9 @@ function install_bin() {
function
install_lib
()
{
# Remove links
${
csudo
}
rm
-f
${
lib_link_dir
}
/libtaos.
*
||
:
if
[
"
$osType
"
!=
"Darwin"
]
;
then
${
csudo
}
rm
-f
${
lib64_link_dir
}
/libtaos.
*
||
:
fi
if
[
"
$osType
"
!=
"Darwin"
]
;
then
${
csudo
}
cp
${
binary_dir
}
/build/lib/libtaos.so.
${
verNumber
}
${
install_main_dir
}
/driver
&&
${
csudo
}
chmod
777
${
install_main_dir
}
/driver/
*
...
...
@@ -190,12 +192,14 @@ function install_lib() {
${
csudo
}
ln
-sf
${
lib64_link_dir
}
/libtaos.so.1
${
lib64_link_dir
}
/libtaos.so
fi
else
${
csudo
}
cp
${
binary_dir
}
/build/lib/libtaos.
*
${
install_main_dir
}
/driver
&&
${
csudo
}
chmod
777
${
install_main_dir
}
/driver/
*
${
csudo
}
ln
-sf
${
install_main_dir
}
/driver/libtaos.
*
${
lib_link_dir
}
/libtaos.1.dylib
${
csudo
}
cp
-Rf
${
binary_dir
}
/build/lib/libtaos.
*
${
install_main_dir
}
/driver
&&
${
csudo
}
chmod
777
${
install_main_dir
}
/driver/
*
${
csudo
}
ln
-sf
${
install_main_dir
}
/driver/libtaos.
1.dylib
${
lib_link_dir
}
/libtaos.1.dylib
${
csudo
}
ln
-sf
${
lib_link_dir
}
/libtaos.1.dylib
${
lib_link_dir
}
/libtaos.dylib
fi
if
[
"
$osType
"
!=
"Darwin"
]
;
then
${
csudo
}
ldconfig
fi
}
function
install_header
()
{
...
...
src/balance/CMakeLists.txt
浏览文件 @
27351366
...
...
@@ -11,3 +11,7 @@ AUX_SOURCE_DIRECTORY(src SRC)
IF
(
TD_LINUX
)
ADD_LIBRARY
(
balance
${
SRC
}
)
ENDIF
()
IF
(
TD_DARWIN
)
ADD_LIBRARY
(
balance
${
SRC
}
)
ENDIF
()
src/client/CMakeLists.txt
浏览文件 @
27351366
...
...
@@ -28,6 +28,28 @@ IF (TD_LINUX)
ADD_SUBDIRECTORY
(
tests
)
ELSEIF
(
TD_DARWIN
)
INCLUDE_DIRECTORIES
(
${
TD_COMMUNITY_DIR
}
/deps/jni/linux
)
# set the static lib name
ADD_LIBRARY
(
taos_static STATIC
${
SRC
}
)
TARGET_LINK_LIBRARIES
(
taos_static common query trpc tutil pthread m
)
SET_TARGET_PROPERTIES
(
taos_static PROPERTIES OUTPUT_NAME
"taos_static"
)
SET_TARGET_PROPERTIES
(
taos_static PROPERTIES CLEAN_DIRECT_OUTPUT 1
)
# generate dynamic library (*.dylib)
ADD_LIBRARY
(
taos SHARED
${
SRC
}
)
TARGET_LINK_LIBRARIES
(
taos common query trpc tutil pthread m
)
SET_TARGET_PROPERTIES
(
taos PROPERTIES CLEAN_DIRECT_OUTPUT 1
)
#set version of .dylib
#VERSION dylib version
#SOVERSION dylib version
#MESSAGE(STATUS "build version ${TD_VER_NUMBER}")
SET_TARGET_PROPERTIES
(
taos PROPERTIES VERSION
${
TD_VER_NUMBER
}
SOVERSION 1
)
ADD_SUBDIRECTORY
(
tests
)
ELSEIF
(
TD_WINDOWS
)
INCLUDE_DIRECTORIES
(
${
TD_COMMUNITY_DIR
}
/deps/jni/windows
)
INCLUDE_DIRECTORIES
(
${
TD_COMMUNITY_DIR
}
/deps/jni/windows/win32
)
...
...
@@ -49,12 +71,12 @@ ELSEIF (TD_DARWIN)
INCLUDE_DIRECTORIES
(
${
TD_COMMUNITY_DIR
}
/deps/jni/linux
)
ADD_LIBRARY
(
taos_static STATIC
${
SRC
}
)
TARGET_LINK_LIBRARIES
(
taos_static trpc tutil pthread m
)
TARGET_LINK_LIBRARIES
(
taos_static
query
trpc tutil pthread m
)
SET_TARGET_PROPERTIES
(
taos_static PROPERTIES OUTPUT_NAME
"taos_static"
)
# generate dynamic library (*.dylib)
ADD_LIBRARY
(
taos SHARED
${
SRC
}
)
TARGET_LINK_LIBRARIES
(
taos trpc tutil pthread m
)
TARGET_LINK_LIBRARIES
(
taos
query
trpc tutil pthread m
)
SET_TARGET_PROPERTIES
(
taos PROPERTIES CLEAN_DIRECT_OUTPUT 1
)
...
...
src/client/src/tscSQLParser.c
浏览文件 @
27351366
...
...
@@ -13,10 +13,12 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef __APPLE__
#define _BSD_SOURCE
#define _XOPEN_SOURCE 500
#define _DEFAULT_SOURCE
#define _GNU_SOURCE
#endif // __APPLE__
#include "os.h"
#include "ttype.h"
...
...
src/client/src/tscSql.c
浏览文件 @
27351366
...
...
@@ -295,6 +295,10 @@ void taos_close(TAOS *taos) {
tscDebug
(
"%p HB is freed"
,
pHb
);
taosReleaseRef
(
tscObjRef
,
pHb
->
self
);
#ifdef __APPLE__
// to satisfy later tsem_destroy in taos_free_result
tsem_init
(
&
pHb
->
rspSem
,
0
,
0
);
#endif // __APPLE__
taos_free_result
(
pHb
);
}
}
...
...
src/client/src/tscUtil.c
浏览文件 @
27351366
...
...
@@ -1942,6 +1942,10 @@ SSqlObj* createSimpleSubObj(SSqlObj* pSql, __async_cb_func_t fp, void* param, in
}
if
(
tscAddSubqueryInfo
(
pCmd
)
!=
TSDB_CODE_SUCCESS
)
{
#ifdef __APPLE__
// to satisfy later tsem_destroy in taos_free_result
tsem_init
(
&
pNew
->
rspSem
,
0
,
0
);
#endif // __APPLE__
tscFreeSqlObj
(
pNew
);
return
NULL
;
}
...
...
@@ -2508,7 +2512,11 @@ bool tscSetSqlOwner(SSqlObj* pSql) {
SSqlRes
*
pRes
=
&
pSql
->
res
;
// set the sql object owner
#ifdef __APPLE__
pthread_t
threadId
=
(
pthread_t
)
taosGetSelfPthreadId
();
#else // __APPLE__
uint64_t
threadId
=
taosGetSelfPthreadId
();
#endif // __APPLE__
if
(
atomic_val_compare_exchange_64
(
&
pSql
->
owner
,
0
,
threadId
)
!=
0
)
{
pRes
->
code
=
TSDB_CODE_QRY_IN_EXEC
;
return
false
;
...
...
src/cq/CMakeLists.txt
浏览文件 @
27351366
...
...
@@ -15,3 +15,13 @@ IF (TD_LINUX)
ENDIF
()
ADD_SUBDIRECTORY
(
test
)
ENDIF
()
IF
(
TD_DARWIN
)
ADD_LIBRARY
(
tcq
${
SRC
}
)
IF
(
TD_SOMODE_STATIC
)
TARGET_LINK_LIBRARIES
(
tcq tutil common taos_static
)
ELSE
()
TARGET_LINK_LIBRARIES
(
tcq tutil common taos
)
ENDIF
()
ADD_SUBDIRECTORY
(
test
)
ENDIF
()
src/dnode/CMakeLists.txt
浏览文件 @
27351366
...
...
@@ -47,3 +47,42 @@ IF (TD_LINUX)
COMMENT
"prepare taosd environment"
)
ADD_CUSTOM_TARGET
(
${
PREPARE_ENV_TARGET
}
ALL WORKING_DIRECTORY
${
TD_EXECUTABLE_OUTPUT_PATH
}
DEPENDS
${
PREPARE_ENV_CMD
}
)
ENDIF
()
IF
(
TD_DARWIN
)
ADD_EXECUTABLE
(
taosd
${
SRC
}
)
TARGET_LINK_LIBRARIES
(
taosd mnode monitor http tsdb twal vnode cJson lz4 balance sync
)
IF
(
TD_SOMODE_STATIC
)
TARGET_LINK_LIBRARIES
(
taosd taos_static
)
ELSE
()
TARGET_LINK_LIBRARIES
(
taosd taos
)
ENDIF
()
IF
(
TD_ACCOUNT
)
TARGET_LINK_LIBRARIES
(
taosd account
)
ENDIF
()
IF
(
TD_GRANT
)
TARGET_LINK_LIBRARIES
(
taosd grant
)
ENDIF
()
IF
(
TD_MQTT
)
TARGET_LINK_LIBRARIES
(
taosd mqtt
)
ENDIF
()
# SET(PREPARE_ENV_CMD "prepare_env_cmd")
# SET(PREPARE_ENV_TARGET "prepare_env_target")
# ADD_CUSTOM_COMMAND(OUTPUT ${PREPARE_ENV_CMD}
# POST_BUILD
# COMMAND echo "make test directory"
# DEPENDS taosd
# COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/cfg/
# COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/log/
# COMMAND ${CMAKE_COMMAND} -E make_directory ${TD_TESTS_OUTPUT_DIR}/data/
# COMMAND ${CMAKE_COMMAND} -E echo dataDir ${TD_TESTS_OUTPUT_DIR}/data > ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
# COMMAND ${CMAKE_COMMAND} -E echo logDir ${TD_TESTS_OUTPUT_DIR}/log >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
# COMMAND ${CMAKE_COMMAND} -E echo charset UTF-8 >> ${TD_TESTS_OUTPUT_DIR}/cfg/taos.cfg
# COMMENT "prepare taosd environment")
# ADD_CUSTOM_TARGET(${PREPARE_ENV_TARGET} ALL WORKING_DIRECTORY ${TD_EXECUTABLE_OUTPUT_PATH} DEPENDS ${PREPARE_ENV_CMD})
ENDIF
()
src/dnode/src/dnodeSystem.c
浏览文件 @
27351366
...
...
@@ -160,7 +160,11 @@ static void signal_handler(int32_t signum, siginfo_t *sigInfo, void *context) {
syslog
(
LOG_INFO
,
"Shut down signal is %d"
,
signum
);
syslog
(
LOG_INFO
,
"Shutting down TDengine service..."
);
// clean the system.
#ifdef __APPLE__
dInfo
(
"shut down signal is %d, sender PID:%d"
,
signum
,
sigInfo
->
si_pid
);
#else // __APPLE__
dInfo
(
"shut down signal is %d, sender PID:%d cmdline:%s"
,
signum
,
sigInfo
->
si_pid
,
taosGetCmdlineByPID
(
sigInfo
->
si_pid
));
#endif // __APPLE__
// protect the application from receive another signal
struct
sigaction
act
=
{{
0
}};
...
...
src/dnode/src/dnodeTelemetry.c
浏览文件 @
27351366
...
...
@@ -236,6 +236,13 @@ static void sendTelemetryReport() {
taosCloseSocket
(
fd
);
}
#ifdef __APPLE__
static
int
sem_timedwait
(
tsem_t
*
sem
,
struct
timespec
*
to
)
{
fprintf
(
stderr
,
"%s[%d]%s(): not implemented yet!
\n
"
,
basename
(
__FILE__
),
__LINE__
,
__func__
);
abort
();
}
#endif // __APPLE__
static
void
*
telemetryThread
(
void
*
param
)
{
struct
timespec
end
=
{
0
};
clock_gettime
(
CLOCK_REALTIME
,
&
end
);
...
...
src/kit/shell/CMakeLists.txt
浏览文件 @
27351366
...
...
@@ -37,7 +37,10 @@ ELSEIF (TD_DARWIN)
LIST
(
APPEND SRC ./src/shellCommand.c
)
LIST
(
APPEND SRC ./src/shellImport.c
)
ADD_EXECUTABLE
(
shell
${
SRC
}
)
TARGET_LINK_LIBRARIES
(
shell taos_static
)
# linking with dylib
TARGET_LINK_LIBRARIES
(
shell taos
)
# linking taos statically
# TARGET_LINK_LIBRARIES(shell taos_static)
SET_TARGET_PROPERTIES
(
shell PROPERTIES OUTPUT_NAME taos
)
ENDIF
()
src/kit/shell/src/shellDarwin.c
浏览文件 @
27351366
...
...
@@ -21,6 +21,8 @@
#include "shellCommand.h"
#include "tkey.h"
#include "tscLog.h"
#define OPT_ABORT 1
/* �Cabort */
int
indicator
=
1
;
...
...
@@ -348,6 +350,9 @@ void *shellLoopQuery(void *arg) {
reset_terminal_mode
();
}
while
(
shellRunCommand
(
con
,
command
)
==
0
);
tfree
(
command
);
exitShell
();
pthread_cleanup_pop
(
1
);
return
NULL
;
...
...
src/kit/taosdemo/CMakeLists.txt
浏览文件 @
27351366
...
...
@@ -17,4 +17,13 @@ ELSEIF (TD_WINDOWS)
AUX_SOURCE_DIRECTORY
(
. SRC
)
ADD_EXECUTABLE
(
taosdemo
${
SRC
}
)
TARGET_LINK_LIBRARIES
(
taosdemo taos_static
)
ELSEIF
(
TD_DARWIN
)
AUX_SOURCE_DIRECTORY
(
. SRC
)
ADD_EXECUTABLE
(
taosdemo
${
SRC
}
)
IF
(
TD_SOMODE_STATIC
)
TARGET_LINK_LIBRARIES
(
taosdemo taos_static
)
ELSE
()
TARGET_LINK_LIBRARIES
(
taosdemo taos
)
ENDIF
()
ENDIF
()
src/kit/taosdemox/CMakeLists.txt
浏览文件 @
27351366
...
...
@@ -23,3 +23,25 @@ IF (TD_LINUX)
TARGET_LINK_LIBRARIES
(
taosdemox taos cJson
)
ENDIF
()
ENDIF
()
IF
(
TD_DARWIN
)
# missing a few dependencies, such as <argp.h>
# AUX_SOURCE_DIRECTORY(. SRC)
# ADD_EXECUTABLE(taosdemox ${SRC})
#
# #find_program(HAVE_CURL NAMES curl)
# IF ((NOT TD_ARM_64) AND (NOT TD_ARM_32))
# ADD_DEFINITIONS(-DTD_LOWA_CURL)
# LINK_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/libcurl/lib)
# ADD_LIBRARY(curl STATIC IMPORTED)
# SET_PROPERTY(TARGET curl PROPERTY IMPORTED_LOCATION ${TD_COMMUNITY_DIR}/deps/libcurl/lib/libcurl.a)
# TARGET_LINK_LIBRARIES(taosdemox curl)
# ENDIF ()
#
# IF (TD_SOMODE_STATIC)
# TARGET_LINK_LIBRARIES(taosdemox taos_static cJson)
# ELSE ()
# TARGET_LINK_LIBRARIES(taosdemox taos cJson)
# ENDIF ()
ENDIF
()
src/kit/taosdump/CMakeLists.txt
浏览文件 @
27351366
...
...
@@ -14,3 +14,13 @@ IF (TD_LINUX)
TARGET_LINK_LIBRARIES
(
taosdump taos
)
ENDIF
()
ENDIF
()
IF
(
TD_DARWIN
)
# missing <argp.h> for macosx
# ADD_EXECUTABLE(taosdump ${SRC})
# IF (TD_SOMODE_STATIC)
# TARGET_LINK_LIBRARIES(taosdump taos_static)
# ELSE ()
# TARGET_LINK_LIBRARIES(taosdump taos)
# ENDIF ()
ENDIF
()
src/mnode/CMakeLists.txt
浏览文件 @
27351366
...
...
@@ -10,3 +10,14 @@ IF (TD_LINUX)
ADD_LIBRARY
(
mnode
${
SRC
}
)
ENDIF
()
IF
(
TD_DARWIN
)
INCLUDE_DIRECTORIES
(
${
TD_COMMUNITY_DIR
}
/src/query/inc
)
INCLUDE_DIRECTORIES
(
${
TD_COMMUNITY_DIR
}
/src/dnode/inc
)
INCLUDE_DIRECTORIES
(
inc
)
AUX_SOURCE_DIRECTORY
(
src SRC
)
ADD_LIBRARY
(
mnode
${
SRC
}
)
ENDIF
()
src/mnode/src/mnodeCluster.c
浏览文件 @
27351366
...
...
@@ -138,6 +138,14 @@ void mnodeDecClusterRef(SClusterObj *pCluster) {
sdbDecRef
(
tsClusterSdb
,
pCluster
);
}
#ifdef __APPLE__
bool
taosGetSystemUid
(
char
*
uid
)
{
fprintf
(
stderr
,
"%s[%d]%s(): not implemented yet!
\n
"
,
basename
(
__FILE__
),
__LINE__
,
__func__
);
abort
();
return
false
;
}
#endif // __APPLE__
static
int32_t
mnodeCreateCluster
()
{
int32_t
numOfClusters
=
sdbGetNumOfRows
(
tsClusterSdb
);
if
(
numOfClusters
!=
0
)
return
TSDB_CODE_SUCCESS
;
...
...
src/os/inc/eok.h
0 → 100644
浏览文件 @
27351366
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _eok_h_fd274616_996c_400e_9023_ae70be881fa3_
#define _eok_h_fd274616_996c_400e_9023_ae70be881fa3_
#include <stdint.h>
#ifdef __cplusplus
extern
"C"
{
#endif
#ifdef __APPLE__
enum
EPOLL_EVENTS
{
EPOLLIN
=
0x001
,
#define EPOLLIN EPOLLIN
EPOLLPRI
=
0x002
,
#define EPOLLPRI EPOLLPRI
EPOLLOUT
=
0x004
,
#define EPOLLOUT EPOLLOUT
EPOLLRDNORM
=
0x040
,
#define EPOLLRDNORM EPOLLRDNORM
EPOLLRDBAND
=
0x080
,
#define EPOLLRDBAND EPOLLRDBAND
EPOLLWRNORM
=
0x100
,
#define EPOLLWRNORM EPOLLWRNORM
EPOLLWRBAND
=
0x200
,
#define EPOLLWRBAND EPOLLWRBAND
EPOLLMSG
=
0x400
,
#define EPOLLMSG EPOLLMSG
EPOLLERR
=
0x008
,
#define EPOLLERR EPOLLERR
EPOLLHUP
=
0x010
,
#define EPOLLHUP EPOLLHUP
EPOLLRDHUP
=
0x2000
,
#define EPOLLRDHUP EPOLLRDHUP
EPOLLEXCLUSIVE
=
1u
<<
28
,
#define EPOLLEXCLUSIVE EPOLLEXCLUSIVE
EPOLLWAKEUP
=
1u
<<
29
,
#define EPOLLWAKEUP EPOLLWAKEUP
EPOLLONESHOT
=
1u
<<
30
,
#define EPOLLONESHOT EPOLLONESHOT
EPOLLET
=
1u
<<
31
#define EPOLLET EPOLLET
};
/* Valid opcodes ( "op" parameter ) to issue to epoll_ctl(). */
#define EPOLL_CTL_ADD 1
/* Add a file descriptor to the interface. */
#define EPOLL_CTL_DEL 2
/* Remove a file descriptor from the interface. */
#define EPOLL_CTL_MOD 3
/* Change file descriptor epoll_event structure. */
typedef
union
epoll_data
{
void
*
ptr
;
int
fd
;
uint32_t
u32
;
uint64_t
u64
;
}
epoll_data_t
;
struct
epoll_event
{
uint32_t
events
;
/* Epoll events */
epoll_data_t
data
;
/* User data variable */
};
int
epoll_create
(
int
size
);
int
epoll_ctl
(
int
epfd
,
int
op
,
int
fd
,
struct
epoll_event
*
event
);
int
epoll_wait
(
int
epfd
,
struct
epoll_event
*
events
,
int
maxevents
,
int
timeout
);
int
epoll_close
(
int
epfd
);
#endif // __APPLE__
#ifdef __cplusplus
}
#endif
#endif // _eok_h_fd274616_996c_400e_9023_ae70be881fa3_
src/os/inc/osDarwin.h
浏览文件 @
27351366
...
...
@@ -75,11 +75,11 @@ extern "C" {
#define TAOS_OS_FUNC_FILE_SENDIFLE
#define TAOS_OS_FUNC_SEMPHONE
#define tsem_t dispatch_semaphore_t
int
tsem_init
(
dispatch_semaphore
_t
*
sem
,
int
pshared
,
unsigned
int
value
);
int
tsem_wait
(
dispatch_semaphore
_t
*
sem
);
int
tsem_post
(
dispatch_semaphore
_t
*
sem
);
int
tsem_destroy
(
dispatch_semaphore
_t
*
sem
);
typedef
struct
tsem_s
*
tsem_t
;
int
tsem_init
(
tsem
_t
*
sem
,
int
pshared
,
unsigned
int
value
);
int
tsem_wait
(
tsem
_t
*
sem
);
int
tsem_post
(
tsem
_t
*
sem
);
int
tsem_destroy
(
tsem
_t
*
sem
);
#define TAOS_OS_FUNC_SOCKET_SETSOCKETOPT
#define TAOS_OS_FUNC_STRING_STR2INT64
...
...
@@ -91,7 +91,7 @@ extern "C" {
typedef
int
(
*
__compar_fn_t
)(
const
void
*
,
const
void
*
);
// for send function in tsocket.c
#define MSG_NOSIGNAL 0
//
#define MSG_NOSIGNAL 0
#define SO_NO_CHECK 0x1234
#define SOL_TCP 0x1234
#define TCP_KEEPIDLE 0x1234
...
...
@@ -100,6 +100,17 @@ typedef int(*__compar_fn_t)(const void *, const void *);
#define PTHREAD_MUTEX_RECURSIVE_NP PTHREAD_MUTEX_RECURSIVE
#endif
int64_t
tsosStr2int64
(
char
*
str
);
#include "eok.h"
void
taos_block_sigalrm
(
void
);
#ifdef __cplusplus
}
#endif
...
...
src/os/inc/osDef.h
浏览文件 @
27351366
...
...
@@ -90,11 +90,12 @@ extern "C" {
#ifdef _ISOC11_SOURCE
#define threadlocal _Thread_local
#elif defined(__APPLE__)
#define threadlocal
#define threadlocal
__thread
#elif defined(__GNUC__) && !defined(threadlocal)
#define threadlocal __thread
#else
#define threadlocal
// #define threadlocal
#error please follow with the thread-local implementation on the target platform
#endif
#ifdef __cplusplus
...
...
src/os/src/darwin/darwinEnv.c
浏览文件 @
27351366
...
...
@@ -30,3 +30,4 @@ void osInit() {
strcpy
(
tsScriptDir
,
"~/TDengine/cfg"
);
strcpy
(
tsOsName
,
"Darwin"
);
}
src/os/src/darwin/darwinSemphone.c
浏览文件 @
27351366
...
...
@@ -13,28 +13,269 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// fail-fast or let-it-crash philosophy
// https://en.wikipedia.org/wiki/Fail-fast
// https://stackoverflow.com/questions/4393197/erlangs-let-it-crash-philosophy-applicable-elsewhere
// experimentally, we follow log-and-crash here
#define _DEFAULT_SOURCE
#include "os.h"
int
tsem_init
(
dispatch_semaphore_t
*
sem
,
int
pshared
,
unsigned
int
value
)
{
*
sem
=
dispatch_semaphore_create
(
value
);
if
(
*
sem
==
NULL
)
{
// #define SEM_USE_PTHREAD
// #define SEM_USE_POSIX
#define SEM_USE_SEM
#ifdef SEM_USE_SEM
#include <mach/mach_init.h>
#include <mach/mach_error.h>
#include <mach/semaphore.h>
#include <mach/task.h>
static
pthread_t
sem_thread
;
static
pthread_once_t
sem_once
;
static
task_t
sem_port
;
static
volatile
int
sem_inited
=
0
;
static
semaphore_t
sem_exit
;
static
void
*
sem_thread_routine
(
void
*
arg
)
{
(
void
)
arg
;
sem_port
=
mach_task_self
();
kern_return_t
ret
=
semaphore_create
(
sem_port
,
&
sem_exit
,
SYNC_POLICY_FIFO
,
0
);
if
(
ret
!=
KERN_SUCCESS
)
{
fprintf
(
stderr
,
"==%s[%d]%s()==failed to create sem_exit
\n
"
,
basename
(
__FILE__
),
__LINE__
,
__func__
);
sem_inited
=
-
1
;
return
NULL
;
}
sem_inited
=
1
;
semaphore_wait
(
sem_exit
);
return
NULL
;
}
static
void
once_init
(
void
)
{
int
r
=
0
;
r
=
pthread_create
(
&
sem_thread
,
NULL
,
sem_thread_routine
,
NULL
);
if
(
r
)
{
fprintf
(
stderr
,
"==%s[%d]%s()==failed to create thread
\n
"
,
basename
(
__FILE__
),
__LINE__
,
__func__
);
return
;
}
while
(
sem_inited
==
0
)
{
;
}
}
#endif
struct
tsem_s
{
#ifdef SEM_USE_PTHREAD
pthread_mutex_t
lock
;
pthread_cond_t
cond
;
volatile
int64_t
val
;
#elif defined(SEM_USE_POSIX)
size_t
id
;
sem_t
*
sem
;
#elif defined(SEM_USE_SEM)
semaphore_t
sem
;
#else // SEM_USE_PTHREAD
dispatch_semaphore_t
sem
;
#endif // SEM_USE_PTHREAD
volatile
unsigned
int
valid
:
1
;
};
int
tsem_init
(
tsem_t
*
sem
,
int
pshared
,
unsigned
int
value
)
{
// fprintf(stderr, "==%s[%d]%s():[%p]==creating\n", basename(__FILE__), __LINE__, __func__, sem);
if
(
*
sem
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==already initialized
\n
"
,
basename
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
struct
tsem_s
*
p
=
(
struct
tsem_s
*
)
calloc
(
1
,
sizeof
(
*
p
));
if
(
!
p
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==out of memory
\n
"
,
basename
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
#ifdef SEM_USE_PTHREAD
int
r
=
pthread_mutex_init
(
&
p
->
lock
,
NULL
);
do
{
if
(
r
)
break
;
r
=
pthread_cond_init
(
&
p
->
cond
,
NULL
);
if
(
r
)
{
pthread_mutex_destroy
(
&
p
->
lock
);
break
;
}
p
->
val
=
value
;
}
while
(
0
);
if
(
r
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==not created
\n
"
,
basename
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
#elif defined(SEM_USE_POSIX)
static
size_t
tick
=
0
;
do
{
size_t
id
=
atomic_add_fetch_64
(
&
tick
,
1
);
if
(
id
==
SEM_VALUE_MAX
)
{
atomic_store_64
(
&
tick
,
0
);
id
=
0
;
}
char
name
[
NAME_MAX
-
4
];
snprintf
(
name
,
sizeof
(
name
),
"/t%ld"
,
id
);
p
->
sem
=
sem_open
(
name
,
O_CREAT
|
O_EXCL
,
pshared
,
value
);
p
->
id
=
id
;
if
(
p
->
sem
!=
SEM_FAILED
)
break
;
int
e
=
errno
;
if
(
e
==
EEXIST
)
continue
;
if
(
e
==
EINTR
)
continue
;
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==not created[%d]%s
\n
"
,
basename
(
__FILE__
),
__LINE__
,
__func__
,
sem
,
e
,
strerror
(
e
));
abort
();
}
while
(
p
->
sem
==
SEM_FAILED
);
#elif defined(SEM_USE_SEM)
pthread_once
(
&
sem_once
,
once_init
);
if
(
sem_inited
!=
1
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal resource init failed
\n
"
,
basename
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
errno
=
ENOMEM
;
return
-
1
;
}
else
{
return
0
;
}
kern_return_t
ret
=
semaphore_create
(
sem_port
,
&
p
->
sem
,
SYNC_POLICY_FIFO
,
0
);
if
(
ret
!=
KERN_SUCCESS
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==semophore_create failed
\n
"
,
basename
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
// we fail-fast here, because we have less-doc about semaphore_create for the moment
abort
();
}
#else // SEM_USE_PTHREAD
p
->
sem
=
dispatch_semaphore_create
(
value
);
if
(
p
->
sem
==
NULL
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==not created
\n
"
,
basename
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
#endif // SEM_USE_PTHREAD
p
->
valid
=
1
;
*
sem
=
p
;
return
0
;
}
int
tsem_wait
(
dispatch_semaphore_t
*
sem
)
{
dispatch_semaphore_wait
(
*
sem
,
DISPATCH_TIME_FOREVER
);
int
tsem_wait
(
tsem_t
*
sem
)
{
if
(
!*
sem
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==not initialized
\n
"
,
basename
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
struct
tsem_s
*
p
=
*
sem
;
if
(
!
p
->
valid
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==already destroyed
\n
"
,
basename
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
#ifdef SEM_USE_PTHREAD
if
(
pthread_mutex_lock
(
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
basename
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
p
->
val
-=
1
;
if
(
p
->
val
<
0
)
{
if
(
pthread_cond_wait
(
&
p
->
cond
,
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
basename
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
}
if
(
pthread_mutex_unlock
(
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
basename
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
return
0
;
#elif defined(SEM_USE_POSIX)
return
sem_wait
(
p
->
sem
);
#elif defined(SEM_USE_SEM)
return
semaphore_wait
(
p
->
sem
);
#else // SEM_USE_PTHREAD
return
dispatch_semaphore_wait
(
p
->
sem
,
DISPATCH_TIME_FOREVER
);
#endif // SEM_USE_PTHREAD
}
int
tsem_post
(
dispatch_semaphore_t
*
sem
)
{
dispatch_semaphore_signal
(
*
sem
);
int
tsem_post
(
tsem_t
*
sem
)
{
if
(
!*
sem
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==not initialized
\n
"
,
basename
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
struct
tsem_s
*
p
=
*
sem
;
if
(
!
p
->
valid
)
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==already destroyed
\n
"
,
basename
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
#ifdef SEM_USE_PTHREAD
if
(
pthread_mutex_lock
(
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
basename
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
p
->
val
+=
1
;
if
(
p
->
val
<=
0
)
{
if
(
pthread_cond_signal
(
&
p
->
cond
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
basename
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
}
if
(
pthread_mutex_unlock
(
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
basename
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
return
0
;
#elif defined(SEM_USE_POSIX)
return
sem_post
(
p
->
sem
);
#elif defined(SEM_USE_SEM)
return
semaphore_signal
(
p
->
sem
);
#else // SEM_USE_PTHREAD
return
dispatch_semaphore_signal
(
p
->
sem
);
#endif // SEM_USE_PTHREAD
}
int
tsem_destroy
(
dispatch_semaphore_t
*
sem
)
{
int
tsem_destroy
(
tsem_t
*
sem
)
{
// fprintf(stderr, "==%s[%d]%s():[%p]==destroying\n", basename(__FILE__), __LINE__, __func__, sem);
if
(
!*
sem
)
{
// fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", basename(__FILE__), __LINE__, __func__, sem);
// abort();
return
0
;
}
struct
tsem_s
*
p
=
*
sem
;
if
(
!
p
->
valid
)
{
// fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", basename(__FILE__), __LINE__, __func__, sem);
// abort();
return
0
;
}
#ifdef SEM_USE_PTHREAD
if
(
pthread_mutex_lock
(
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
basename
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
p
->
valid
=
0
;
if
(
pthread_cond_destroy
(
&
p
->
cond
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
basename
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
if
(
pthread_mutex_unlock
(
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
basename
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
if
(
pthread_mutex_destroy
(
&
p
->
lock
))
{
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==internal logic error
\n
"
,
basename
(
__FILE__
),
__LINE__
,
__func__
,
sem
);
abort
();
}
#elif defined(SEM_USE_POSIX)
char
name
[
NAME_MAX
-
4
];
snprintf
(
name
,
sizeof
(
name
),
"/t%ld"
,
p
->
id
);
int
r
=
sem_unlink
(
name
);
if
(
r
)
{
int
e
=
errno
;
fprintf
(
stderr
,
"==%s[%d]%s():[%p]==unlink failed[%d]%s
\n
"
,
basename
(
__FILE__
),
__LINE__
,
__func__
,
sem
,
e
,
strerror
(
e
));
abort
();
}
#elif defined(SEM_USE_SEM)
semaphore_destroy
(
sem_port
,
p
->
sem
);
#else // SEM_USE_PTHREAD
#endif // SEM_USE_PTHREAD
p
->
valid
=
0
;
free
(
p
);
*
sem
=
NULL
;
return
0
;
}
src/os/src/darwin/darwinTimer.c
浏览文件 @
27351366
...
...
@@ -13,9 +13,82 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// fail-fast or let-it-crash philosophy
// https://en.wikipedia.org/wiki/Fail-fast
// https://stackoverflow.com/questions/4393197/erlangs-let-it-crash-philosophy-applicable-elsewhere
// experimentally, we follow log-and-crash here
#define _DEFAULT_SOURCE
#include "os.h"
#if 1
#include <sys/event.h>
static
void
(
*
timer_callback
)(
int
);
static
int
timer_ms
=
0
;
static
pthread_t
timer_thread
;
static
int
timer_kq
=
-
1
;
static
volatile
int
timer_stop
=
0
;
static
void
*
timer_routine
(
void
*
arg
)
{
(
void
)
arg
;
int
r
=
0
;
struct
timespec
to
=
{
0
};
to
.
tv_sec
=
timer_ms
/
1000
;
to
.
tv_nsec
=
(
timer_ms
%
1000
)
*
1000000
;
while
(
!
timer_stop
)
{
struct
kevent64_s
kev
[
10
]
=
{
0
};
r
=
kevent64
(
timer_kq
,
NULL
,
0
,
kev
,
sizeof
(
kev
)
/
sizeof
(
kev
[
0
]),
0
,
&
to
);
if
(
r
!=
0
)
{
fprintf
(
stderr
,
"==%s[%d]%s()==kevent64 failed
\n
"
,
basename
(
__FILE__
),
__LINE__
,
__func__
);
abort
();
}
timer_callback
(
SIGALRM
);
// just mock
}
return
NULL
;
}
int
taosInitTimer
(
void
(
*
callback
)(
int
),
int
ms
)
{
int
r
=
0
;
timer_ms
=
ms
;
timer_callback
=
callback
;
timer_kq
=
kqueue
();
if
(
timer_kq
==-
1
)
{
fprintf
(
stderr
,
"==%s[%d]%s()==failed to create timer kq
\n
"
,
basename
(
__FILE__
),
__LINE__
,
__func__
);
// since no caller of this func checks the return value for the moment
abort
();
}
r
=
pthread_create
(
&
timer_thread
,
NULL
,
timer_routine
,
NULL
);
if
(
r
)
{
fprintf
(
stderr
,
"==%s[%d]%s()==failed to create timer thread
\n
"
,
basename
(
__FILE__
),
__LINE__
,
__func__
);
// since no caller of this func checks the return value for the moment
abort
();
}
return
0
;
}
void
taosUninitTimer
()
{
int
r
=
0
;
timer_stop
=
1
;
r
=
pthread_join
(
timer_thread
,
NULL
);
if
(
r
)
{
fprintf
(
stderr
,
"==%s[%d]%s()==failed to join timer thread
\n
"
,
basename
(
__FILE__
),
__LINE__
,
__func__
);
// since no caller of this func checks the return value for the moment
abort
();
}
close
(
timer_kq
);
timer_kq
=
-
1
;
}
void
taos_block_sigalrm
(
void
)
{
// we don't know if there's any specific API for SIGALRM to deliver to specific thread
// this implementation relies on kqueue rather than SIGALRM
}
#else
int
taosInitTimer
(
void
(
*
callback
)(
int
),
int
ms
)
{
signal
(
SIGALRM
,
callback
);
...
...
@@ -34,3 +107,17 @@ void taosUninitTimer() {
setitimer
(
ITIMER_REAL
,
&
tv
,
NULL
);
}
void
taos_block_sigalrm
(
void
)
{
// since SIGALRM has been used
// consideration: any better solution?
static
__thread
int
already_set
=
0
;
if
(
!
already_set
)
{
sigset_t
set
;
sigemptyset
(
&
set
);
sigaddset
(
&
set
,
SIGALRM
);
pthread_sigmask
(
SIG_BLOCK
,
&
set
,
NULL
);
already_set
=
1
;
}
}
#endif
src/os/src/darwin/eok.c
0 → 100644
浏览文件 @
27351366
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
// fail-fast or let-it-crash philosophy
// https://en.wikipedia.org/wiki/Fail-fast
// https://stackoverflow.com/questions/4393197/erlangs-let-it-crash-philosophy-applicable-elsewhere
// experimentally, we follow log-and-crash here
#include "eok.h"
#include "os.h"
#include <sys/event.h>
// #define BALANCE_CHECK_WHEN_CLOSE
#ifdef ENABLE_LOG
#define D(fmt, ...) fprintf(stderr, "%s[%d]%s(): " fmt "\n", basename(__FILE__), __LINE__, __func__, ##__VA_ARGS__)
#define E(fmt, ...) do { \
fprintf(stderr, "%s[%d]%s(): %d[%s]: " fmt "\n", \
basename(__FILE__), __LINE__, __func__, \
errno, strerror(errno), \
##__VA_ARGS__); \
} while (0)
#else // !ENABLE_LOG
#define D(fmt, ...) (void)fmt
#define E(fmt, ...) (void)fmt
#endif // ENABLE_LOG
#define A(statement, fmt, ...) do { \
if (statement) break; \
fprintf(stderr, "%s[%d]%s(): assert [%s] failed: %d[%s]: " fmt "\n", \
basename(__FILE__), __LINE__, __func__, \
#statement, errno, strerror(errno), \
##__VA_ARGS__); \
abort(); \
} while (0)
static
int
eok_dummy
=
0
;
typedef
struct
ep_over_kq_s
ep_over_kq_t
;
typedef
struct
eok_event_s
eok_event_t
;
struct
ep_over_kq_s
{
int
kq
;
// !!!
// idx in the eoks list
// used as pseudo-file-desciptor
// must be 'closed' with epoll_close
int
idx
;
ep_over_kq_t
*
next
;
int
sv
[
2
];
// 0 for read, 1 for write
// all registered 'epoll events, key by fd'
int
evs_count
;
eok_event_t
*
evs_head
;
eok_event_t
*
evs_tail
;
eok_event_t
*
evs_free
;
// all kev changes list pending to be processed by kevent64
// key by tuple (ident,filter), ident === fd in this case
struct
kevent64_s
*
kchanges
;
int
nchanges
;
int
ichanges
;
// kev eventslist for kevent64 to store active events
// they remain alive among kevent64 calls
struct
kevent64_s
*
kevslist
;
int
nevslist
;
pthread_mutex_t
lock
;
volatile
unsigned
int
lock_valid
:
1
;
volatile
unsigned
int
waiting
:
1
;
volatile
unsigned
int
changed
:
1
;
volatile
unsigned
int
wakenup
:
1
;
volatile
unsigned
int
stopping
:
1
;
};
struct
eok_event_s
{
int
fd
;
struct
epoll_event
epev
;
volatile
unsigned
int
changed
;
// 0:registered;1:add;2:mod;3:del
eok_event_t
*
next
;
eok_event_t
*
prev
;
};
typedef
struct
eoks_s
eoks_t
;
struct
eoks_s
{
pthread_mutex_t
lock
;
ep_over_kq_t
**
eoks
;
// note: this memory leaks when process terminates
int
neoks
;
// we can add an extra api to let user clean
ep_over_kq_t
*
eoks_free_list
;
// currently, we just keep it simple stupid
};
static
eoks_t
eoks
=
{
.
lock
=
PTHREAD_MUTEX_INITIALIZER
,
.
eoks
=
NULL
,
.
neoks
=
0
,
.
eoks_free_list
=
NULL
,
};
#ifdef ENABLE_LOG
static
const
char
*
op_str
(
int
op
)
{
switch
(
op
)
{
case
EPOLL_CTL_ADD
:
return
"EPOLL_CTL_ADD"
;
case
EPOLL_CTL_MOD
:
return
"EPOLL_CTL_MOD"
;
case
EPOLL_CTL_DEL
:
return
"EPOLL_CTL_DEL"
;
default:
return
"UNKNOWN"
;
}
}
static
__thread
char
buf_slots
[
10
][
1024
]
=
{
0
};
static
__thread
int
buf_slots_linelen
=
sizeof
(
buf_slots
[
0
])
/
sizeof
(
buf_slots
[
0
][
0
]);
static
__thread
int
buf_slots_count
=
sizeof
(
buf_slots
)
/
(
sizeof
(
buf_slots
[
0
])
/
sizeof
(
buf_slots
[
0
][
0
]));
static
const
char
*
events_str
(
uint32_t
events
,
int
slots
)
{
A
(
slots
>=
0
&&
slots
<
buf_slots_count
,
"internal logic error"
);
char
*
buf
=
buf_slots
[
slots
];
char
*
p
=
buf
;
size_t
len
=
buf_slots_linelen
;
int
n
=
0
;
buf
[
0
]
=
'\0'
;
// copied from <sys/epoll.h> on linux
// EPOLLIN = 0x001,
// #define EPOLLIN EPOLLIN
// EPOLLPRI = 0x002,
// #define EPOLLPRI EPOLLPRI
// EPOLLOUT = 0x004,
// #define EPOLLOUT EPOLLOUT
// EPOLLRDNORM = 0x040,
// #define EPOLLRDNORM EPOLLRDNORM
// EPOLLRDBAND = 0x080,
// #define EPOLLRDBAND EPOLLRDBAND
// EPOLLWRNORM = 0x100,
// #define EPOLLWRNORM EPOLLWRNORM
// EPOLLWRBAND = 0x200,
// #define EPOLLWRBAND EPOLLWRBAND
// EPOLLMSG = 0x400,
// #define EPOLLMSG EPOLLMSG
// EPOLLERR = 0x008,
// #define EPOLLERR EPOLLERR
// EPOLLHUP = 0x010,
// #define EPOLLHUP EPOLLHUP
// EPOLLRDHUP = 0x2000,
// #define EPOLLRDHUP EPOLLRDHUP
// EPOLLEXCLUSIVE = 1u << 28,
// #define EPOLLEXCLUSIVE EPOLLEXCLUSIVE
// EPOLLWAKEUP = 1u << 29,
// #define EPOLLWAKEUP EPOLLWAKEUP
// EPOLLONESHOT = 1u << 30,
// #define EPOLLONESHOT EPOLLONESHOT
// EPOLLET = 1u << 31
// #define EPOLLET EPOLLET
#define CHK_EV(ev) \
if (len>0 && (events & (ev))==(ev)) { \
n = snprintf(p, len, "%s%s", p!=buf ? "|" : "", #ev); \
p += n; \
len -= n; \
}
CHK_EV
(
EPOLLIN
);
CHK_EV
(
EPOLLPRI
);
CHK_EV
(
EPOLLOUT
);
CHK_EV
(
EPOLLRDNORM
);
CHK_EV
(
EPOLLRDBAND
);
CHK_EV
(
EPOLLWRNORM
);
CHK_EV
(
EPOLLWRBAND
);
CHK_EV
(
EPOLLMSG
);
CHK_EV
(
EPOLLERR
);
CHK_EV
(
EPOLLHUP
);
CHK_EV
(
EPOLLRDHUP
);
CHK_EV
(
EPOLLEXCLUSIVE
);
CHK_EV
(
EPOLLWAKEUP
);
CHK_EV
(
EPOLLONESHOT
);
CHK_EV
(
EPOLLET
);
#undef CHK_EV
return
buf
;
}
static
const
char
*
kev_flags_str
(
uint16_t
flags
,
int
slots
)
{
A
(
slots
>=
0
&&
slots
<
buf_slots_count
,
"internal logic error"
);
char
*
buf
=
buf_slots
[
slots
];
char
*
p
=
buf
;
size_t
len
=
buf_slots_linelen
;
int
n
=
0
;
buf
[
0
]
=
'\0'
;
// copied to <sys/event.h>
// #define EV_ADD 0x0001 /* add event to kq (implies enable) */
// #define EV_DELETE 0x0002 /* delete event from kq */
// #define EV_ENABLE 0x0004 /* enable event */
// #define EV_DISABLE 0x0008 /* disable event (not reported) */
// /* flags */
// #define EV_ONESHOT 0x0010 /* only report one occurrence */
// #define EV_CLEAR 0x0020 /* clear event state after reporting */
// #define EV_RECEIPT 0x0040 /* force immediate event output */
// /* ... with or without EV_ERROR */
// /* ... use KEVENT_FLAG_ERROR_EVENTS */
// /* on syscalls supporting flags */
// #define EV_DISPATCH 0x0080 /* disable event after reporting */
// #define EV_UDATA_SPECIFIC 0x0100 /* unique kevent per udata value */
// #define EV_DISPATCH2 (EV_DISPATCH | EV_UDATA_SPECIFIC)
// /* ... in combination with EV_DELETE */
// /* will defer delete until udata-specific */
// /* event enabled. EINPROGRESS will be */
// /* returned to indicate the deferral */
// #define EV_VANISHED 0x0200 /* report that source has vanished */
// /* ... only valid with EV_DISPATCH2 */
// #define EV_SYSFLAGS 0xF000 /* reserved by system */
// #define EV_FLAG0 0x1000 /* filter-specific flag */
// #define EV_FLAG1 0x2000 /* filter-specific flag */
// /* returned values */
// #define EV_EOF 0x8000 /* EOF detected */
// #define EV_ERROR 0x4000 /* error, data contains errno */
#define CHK_EV(ev) \
if (len>0 && (flags & (ev))==(ev)) { \
n = snprintf(p, len, "%s%s", p!=buf ? "|" : "", #ev); \
p += n; \
len -= n; \
}
CHK_EV
(
EV_ADD
);
CHK_EV
(
EV_DELETE
);
CHK_EV
(
EV_ENABLE
);
CHK_EV
(
EV_DISABLE
);
CHK_EV
(
EV_ONESHOT
);
CHK_EV
(
EV_CLEAR
);
CHK_EV
(
EV_RECEIPT
);
CHK_EV
(
EV_DISPATCH
);
CHK_EV
(
EV_UDATA_SPECIFIC
);
CHK_EV
(
EV_DISPATCH2
);
CHK_EV
(
EV_VANISHED
);
CHK_EV
(
EV_SYSFLAGS
);
CHK_EV
(
EV_FLAG0
);
CHK_EV
(
EV_FLAG1
);
CHK_EV
(
EV_EOF
);
CHK_EV
(
EV_ERROR
);
#undef CHK_EV
return
buf
;
}
#endif // ENABLE_LOG
static
ep_over_kq_t
*
eoks_alloc
(
void
);
static
void
eoks_free
(
ep_over_kq_t
*
eok
);
static
ep_over_kq_t
*
eoks_find
(
int
epfd
);
static
eok_event_t
*
eok_find_ev
(
ep_over_kq_t
*
eok
,
int
fd
);
static
eok_event_t
*
eok_calloc_ev
(
ep_over_kq_t
*
eok
);
static
void
eok_free_ev
(
ep_over_kq_t
*
eok
,
eok_event_t
*
ev
);
static
void
eok_wakeup
(
ep_over_kq_t
*
eok
);
static
int
eok_chgs_refresh
(
ep_over_kq_t
*
eok
,
eok_event_t
*
oev
,
eok_event_t
*
ev
,
struct
kevent64_s
*
krev
,
struct
kevent64_s
*
kwev
);
static
struct
kevent64_s
*
eok_alloc_eventslist
(
ep_over_kq_t
*
eok
,
int
maxevents
);
int
epoll_create
(
int
size
)
{
(
void
)
size
;
int
e
=
0
;
ep_over_kq_t
*
eok
=
eoks_alloc
();
if
(
!
eok
)
return
-
1
;
A
(
eok
->
kq
==-
1
,
"internal logic error"
);
A
(
eok
->
lock_valid
,
"internal logic error"
);
A
(
eok
->
idx
>=
0
&&
eok
->
idx
<
eoks
.
neoks
,
"internal logic error"
);
A
(
eok
->
next
==
NULL
,
"internal logic error"
);
A
(
eok
->
sv
[
0
]
==-
1
,
"internal logic error"
);
A
(
eok
->
sv
[
1
]
==-
1
,
"internal logic error"
);
eok
->
kq
=
kqueue
();
if
(
eok
->
kq
==-
1
)
{
e
=
errno
;
eoks_free
(
eok
);
errno
=
e
;
return
-
1
;
}
if
(
socketpair
(
AF_LOCAL
,
SOCK_STREAM
,
0
,
eok
->
sv
))
{
e
=
errno
;
eoks_free
(
eok
);
errno
=
e
;
return
-
1
;
}
struct
epoll_event
ev
=
{
0
};
ev
.
events
=
EPOLLIN
;
ev
.
data
.
ptr
=
&
eok_dummy
;
D
(
"epoll_create epfd:[%d] and sv0[%d]"
,
eok
->
idx
,
eok
->
sv
[
0
]);
if
(
epoll_ctl
(
eok
->
idx
,
EPOLL_CTL_ADD
,
eok
->
sv
[
0
],
&
ev
))
{
e
=
errno
;
epoll_close
(
eok
->
idx
);
errno
=
e
;
return
-
1
;
}
return
eok
->
idx
;
}
int
epoll_ctl
(
int
epfd
,
int
op
,
int
fd
,
struct
epoll_event
*
event
)
{
D
(
"epoll_ctling epfd:[%d], op:[%s], fd:[%d], events:[%04x:%s]"
,
epfd
,
op_str
(
op
),
fd
,
event
?
event
->
events
:
0
,
event
?
events_str
(
event
->
events
,
0
)
:
"NULL"
);
int
e
=
0
;
if
(
epfd
<
0
||
epfd
>=
eoks
.
neoks
)
{
errno
=
EBADF
;
return
-
1
;
}
if
(
fd
==-
1
)
{
errno
=
EBADF
;
return
-
1
;
}
if
(
event
&&
!
(
event
->
events
&
(
EPOLLIN
|
EPOLLERR
|
EPOLLHUP
|
EPOLLRDHUP
|
EPOLLOUT
)))
{
e
=
ENOTSUP
;
return
-
1
;
}
ep_over_kq_t
*
eok
=
eoks_find
(
epfd
);
if
(
!
eok
)
{
errno
=
EBADF
;
return
-
1
;
}
A
(
0
==
pthread_mutex_lock
(
&
eok
->
lock
),
""
);
do
{
eok_event_t
*
oev
=
eok_find_ev
(
eok
,
fd
);
if
(
op
==
EPOLL_CTL_ADD
&&
oev
)
{
e
=
EEXIST
;
break
;
}
if
(
op
!=
EPOLL_CTL_ADD
&&
!
oev
)
{
e
=
ENOENT
;
break
;
}
if
(
op
!=
EPOLL_CTL_DEL
&&
!
event
)
{
e
=
EINVAL
;
break
;
}
// prepare krev/kwev
struct
kevent64_s
krev
=
{
0
};
struct
kevent64_s
kwev
=
{
0
};
krev
.
ident
=
-
1
;
kwev
.
ident
=
-
1
;
uint16_t
flags
=
0
;
// prepare internal eok event
eok_event_t
ev
=
{
0
};
ev
.
fd
=
fd
;
if
(
event
)
ev
.
epev
=
*
event
;
struct
epoll_event
*
pev
=
event
;
switch
(
op
)
{
case
EPOLL_CTL_ADD
:
{
flags
=
EV_ADD
;
ev
.
changed
=
1
;
}
break
;
case
EPOLL_CTL_MOD
:
{
flags
=
EV_ADD
;
ev
.
changed
=
2
;
}
break
;
case
EPOLL_CTL_DEL
:
{
// event is ignored
// pev points to registered epoll_event
pev
=
&
oev
->
epev
;
flags
=
EV_DELETE
;
ev
.
changed
=
3
;
}
break
;
default:
{
e
=
ENOTSUP
;
}
break
;
}
if
(
e
)
break
;
// udata will be delayed to be set
if
(
pev
->
events
&
(
EPOLLIN
|
EPOLLERR
|
EPOLLHUP
|
EPOLLRDHUP
))
{
flags
|=
EV_EOF
;
EV_SET64
(
&
krev
,
ev
.
fd
,
EVFILT_READ
,
flags
,
0
,
0
,
-
1
,
0
,
0
);
}
if
(
pev
->
events
&
EPOLLOUT
)
{
EV_SET64
(
&
kwev
,
ev
.
fd
,
EVFILT_WRITE
,
flags
,
0
,
0
,
-
1
,
0
,
0
);
}
// refresh registered evlist and changelist in a transaction way
if
(
eok_chgs_refresh
(
eok
,
oev
,
&
ev
,
&
krev
,
&
kwev
))
{
e
=
errno
;
A
(
e
,
"internal logic error"
);
break
;
}
eok
->
changed
=
1
;
eok_wakeup
(
eok
);
}
while
(
0
);
A
(
0
==
pthread_mutex_unlock
(
&
eok
->
lock
),
""
);
if
(
e
)
{
errno
=
e
;
return
-
1
;
}
return
0
;
}
static
struct
timespec
do_timespec_diff
(
struct
timespec
*
from
,
struct
timespec
*
to
);
int
epoll_wait
(
int
epfd
,
struct
epoll_event
*
events
,
int
maxevents
,
int
timeout
)
{
taos_block_sigalrm
();
int
e
=
0
;
if
(
!
events
)
{
errno
=
EINVAL
;
E
(
"epoll_waiting epfd:[%d], maxevents:[%d], timeout:[%d] failed"
,
epfd
,
maxevents
,
timeout
);
return
-
1
;
}
if
(
maxevents
<=
0
)
{
errno
=
EINVAL
;
E
(
"epoll_waiting epfd:[%d], maxevents:[%d], timeout:[%d] failed"
,
epfd
,
maxevents
,
timeout
);
return
-
1
;
}
struct
timespec
abstime
=
{
0
};
A
(
TIME_UTC
==
timespec_get
(
&
abstime
,
TIME_UTC
),
"internal logic error"
);
if
(
timeout
!=-
1
)
{
if
(
timeout
<
0
)
timeout
=
0
;
int64_t
t
=
abstime
.
tv_nsec
+
timeout
*
1000000
;
abstime
.
tv_sec
+=
t
/
1000000000
;
abstime
.
tv_nsec
=
t
%
1000000000
;
}
int
r
=
0
;
ep_over_kq_t
*
eok
=
eoks_find
(
epfd
);
if
(
!
eok
)
{
errno
=
EBADF
;
E
(
"epoll_waiting epfd:[%d], maxevents:[%d], timeout:[%d] failed"
,
epfd
,
maxevents
,
timeout
);
errno
=
EBADF
;
return
-
1
;
}
int
cnts
=
0
;
A
(
0
==
pthread_mutex_lock
(
&
eok
->
lock
),
""
);
do
{
cnts
=
0
;
A
(
eok
->
waiting
==
0
,
"internal logic error"
);
struct
kevent64_s
*
eventslist
=
eok_alloc_eventslist
(
eok
,
maxevents
);
if
(
!
eventslist
)
{
e
=
ENOMEM
;
E
(
"epoll_waiting epfd:[%d], maxevents:[%d], timeout:[%d] failed"
,
epfd
,
maxevents
,
timeout
);
break
;
}
memset
(
eventslist
,
0
,
maxevents
*
sizeof
(
*
eventslist
));
struct
timespec
now
=
{
0
};
A
(
TIME_UTC
==
timespec_get
(
&
now
,
TIME_UTC
),
"internal logic error"
);
struct
timespec
to
=
do_timespec_diff
(
&
now
,
&
abstime
);
struct
timespec
*
pto
=
&
to
;
if
(
timeout
==-
1
)
{
pto
=
NULL
;
}
// taking the changelist
struct
kevent64_s
*
kchanges
=
eok
->
kchanges
;
int
nchanges
=
eok
->
nchanges
;
int
ichanges
=
eok
->
ichanges
;
// let outside world to add changes
eok
->
kchanges
=
NULL
;
eok
->
nchanges
=
0
;
eok
->
ichanges
=
0
;
eok
->
changed
=
0
;
eok
->
wakenup
=
0
;
eok
->
waiting
=
1
;
A
(
0
==
pthread_mutex_unlock
(
&
eok
->
lock
),
""
);
if
(
ichanges
>
0
)
{
D
(
"kevent64 epfd[%d] changing [%d] changes and waiting..."
,
eok
->
idx
,
ichanges
);
}
errno
=
0
;
r
=
kevent64
(
eok
->
kq
,
kchanges
,
ichanges
,
eventslist
,
maxevents
,
0
,
pto
);
e
=
errno
;
if
(
e
)
{
E
(
"kevent64 epfd[%d] waiting done, with r[%d]"
,
eok
->
idx
,
r
);
}
A
(
0
==
pthread_mutex_lock
(
&
eok
->
lock
),
""
);
eok
->
waiting
=
0
;
if
(
kchanges
)
{
if
(
eok
->
kchanges
==
NULL
)
{
// reuse
A
(
eok
->
nchanges
==
0
&&
eok
->
ichanges
==
0
,
"internal logic error"
);
eok
->
kchanges
=
kchanges
;
eok
->
nchanges
=
nchanges
;
}
else
{
free
(
kchanges
);
kchanges
=
NULL
;
}
nchanges
=
0
;
ichanges
=
0
;
}
if
(
r
==
0
)
{
A
(
timeout
!=-
1
,
"internal logic error"
);
}
for
(
int
i
=
0
;
i
<
r
;
++
i
)
{
struct
kevent64_s
*
kev
=
eventslist
+
i
;
A
(
kev
->
udata
&&
eok
->
evs_head
&&
eok
->
evs_tail
,
"internal logic error"
);
eok_event_t
*
ev
=
(
eok_event_t
*
)
kev
->
udata
;
A
(
kev
->
ident
==
ev
->
fd
,
"internal logic error"
);
if
(
kev
->
flags
&
EV_ERROR
)
{
D
(
"epfd[%d] error when processing change list for fd[%d], error[%s], kev_flags:[%04x:%s]"
,
epfd
,
ev
->
fd
,
strerror
(
kev
->
data
),
kev
->
flags
,
kev_flags_str
(
kev
->
flags
,
0
));
}
switch
(
kev
->
filter
)
{
case
EVFILT_READ
:
{
A
((
ev
->
epev
.
events
&
EPOLLIN
),
"internal logic errro"
);
if
(
ev
->
epev
.
data
.
ptr
==&
eok_dummy
)
{
// it's coming from wakeup socket pair
char
c
=
'\0'
;
A
(
1
==
recv
(
kev
->
ident
,
&
c
,
1
,
0
),
"internal logic error"
);
A
(
0
==
memcmp
(
&
c
,
"1"
,
1
),
"internal logic error"
);
D
(
"epfd[%d] wokenup"
,
epfd
);
continue
;
}
else
{
if
(
ev
->
changed
==
3
)
{
D
(
"epfd[%d] already requested to delete for fd[%d]"
,
epfd
,
ev
->
fd
);
// TODO: write a unit test for this case
// EV_DELETE?
continue
;
}
// converting to epoll_event
// we shall collect all kevents for the uniq fd into one epoll_evnt
// but currently, taos never use EPOLLOUT
// just let it this way for the moment
struct
epoll_event
pev
=
{
0
};
pev
.
data
.
ptr
=
ev
->
epev
.
data
.
ptr
;
pev
.
events
=
EPOLLIN
;
if
(
kev
->
flags
&
EV_EOF
)
{
// take all these as EOF for the moment
pev
.
events
|=
(
EPOLLHUP
|
EPOLLERR
|
EPOLLRDHUP
);
}
// rounded to what user care
pev
.
events
=
pev
.
events
&
ev
->
epev
.
events
;
D
(
"epfd[%d] events found for fd[%d]: [%04x:%s], which was registered: [%04x:%s], kev_flags: [%04x:%s]"
,
epfd
,
ev
->
fd
,
pev
.
events
,
events_str
(
pev
.
events
,
0
),
ev
->
epev
.
events
,
events_str
(
ev
->
epev
.
events
,
1
),
kev
->
flags
,
kev_flags_str
(
kev
->
flags
,
2
));
// now we get ev and store it
events
[
cnts
++
]
=
pev
;
}
}
break
;
case
EVFILT_WRITE
:
{
A
(
0
,
"not implemented yet"
);
}
break
;
default:
{
A
(
0
,
"internal logic error"
);
}
break
;
}
}
if
(
r
>=
0
)
{
// we can safely rule out delete-requested-events from the regitered evlists
// if only changelist are correctly registered
eok_event_t
*
p
=
eok
->
evs_head
;
while
(
p
)
{
eok_event_t
*
next
=
p
->
next
;
if
(
p
->
changed
==
3
)
{
D
(
"epfd[%d] removing registered event for fd[%d]: [%04x:%s]"
,
epfd
,
p
->
fd
,
p
->
epev
.
events
,
events_str
(
p
->
epev
.
events
,
0
));
eok_free_ev
(
eok
,
p
);
}
p
=
next
;
}
}
if
(
cnts
==
0
)
{
// if no user-cared-events is up
// we check to see if time is up
if
(
timeout
!=-
1
)
{
A
(
TIME_UTC
==
timespec_get
(
&
now
,
TIME_UTC
),
"internal logic error"
);
to
=
do_timespec_diff
(
&
now
,
&
abstime
);
if
(
to
.
tv_sec
==
0
&&
to
.
tv_nsec
==
0
)
break
;
}
// time is not up yet, continue loop
}
}
while
(
cnts
==
0
);
if
(
cnts
>
0
)
{
D
(
"kevent64 epfd[%d] waiting done with [%d] events"
,
epfd
,
cnts
);
}
A
(
0
==
pthread_mutex_unlock
(
&
eok
->
lock
),
""
);
if
(
e
)
{
errno
=
e
;
E
(
"epfd[%d] epoll_wait failed"
,
epfd
);
return
-
1
;
}
// tell user how many events are valid
return
cnts
;
}
static
struct
timespec
do_timespec_diff
(
struct
timespec
*
from
,
struct
timespec
*
to
)
{
struct
timespec
delta
;
delta
.
tv_sec
=
to
->
tv_sec
-
from
->
tv_sec
;
delta
.
tv_nsec
=
to
->
tv_nsec
-
from
->
tv_nsec
;
// norm and round up
while
(
delta
.
tv_nsec
<
0
)
{
delta
.
tv_sec
-=
1
;
delta
.
tv_nsec
+=
1000000000
;
}
if
(
delta
.
tv_sec
<
0
)
{
delta
.
tv_sec
=
0
;
delta
.
tv_nsec
=
0
;
}
return
delta
;
}
int
epoll_close
(
int
epfd
)
{
D
(
"epoll_closing epfd: [%d]"
,
epfd
);
ep_over_kq_t
*
eok
=
eoks_find
(
epfd
);
if
(
!
eok
)
{
errno
=
EBADF
;
return
-
1
;
}
A
(
0
==
pthread_mutex_lock
(
&
eok
->
lock
),
""
);
do
{
// panic if it would be double-closed
A
(
eok
->
stopping
==
0
,
"internal logic error"
);
eok
->
stopping
=
1
;
// panic if epoll_wait is pending
A
(
eok
->
waiting
==
0
,
"internal logic error"
);
if
(
eok
->
kq
!=-
1
)
{
close
(
eok
->
kq
);
eok
->
kq
=
-
1
;
}
}
while
(
0
);
A
(
0
==
pthread_mutex_unlock
(
&
eok
->
lock
),
""
);
eoks_free
(
eok
);
return
0
;
}
static
struct
kevent64_s
*
eok_alloc_eventslist
(
ep_over_kq_t
*
eok
,
int
maxevents
)
{
if
(
maxevents
<=
eok
->
nevslist
)
return
eok
->
kevslist
;
struct
kevent64_s
*
p
=
(
struct
kevent64_s
*
)
realloc
(
eok
->
kevslist
,
sizeof
(
*
p
)
*
maxevents
);
if
(
!
p
)
return
NULL
;
eok
->
kevslist
=
p
;
eok
->
nevslist
=
maxevents
;
return
p
;
}
static
eok_event_t
*
eok_find_ev
(
ep_over_kq_t
*
eok
,
int
fd
)
{
eok_event_t
*
p
=
eok
->
evs_head
;
while
(
p
)
{
if
(
p
->
fd
==
fd
)
return
p
;
p
=
p
->
next
;
}
errno
=
ENOENT
;
return
NULL
;
}
static
eok_event_t
*
eok_calloc_ev
(
ep_over_kq_t
*
eok
)
{
eok_event_t
*
p
=
NULL
;
if
(
eok
->
evs_free
)
{
p
=
eok
->
evs_free
;
eok
->
evs_free
=
p
->
next
;
p
->
next
=
NULL
;
A
(
p
->
prev
==
NULL
,
"internal logic error"
);
}
else
{
p
=
(
eok_event_t
*
)
calloc
(
1
,
sizeof
(
*
p
));
if
(
!
p
)
return
NULL
;
A
(
p
->
next
==
NULL
,
"internal logic error"
);
A
(
p
->
prev
==
NULL
,
"internal logic error"
);
}
// dirty link
p
->
prev
=
eok
->
evs_tail
;
if
(
eok
->
evs_tail
)
eok
->
evs_tail
->
next
=
p
;
else
eok
->
evs_head
=
p
;
eok
->
evs_tail
=
p
;
eok
->
evs_count
+=
1
;
return
p
;
}
static
void
eok_free_ev
(
ep_over_kq_t
*
eok
,
eok_event_t
*
ev
)
{
if
(
ev
->
prev
)
ev
->
prev
->
next
=
ev
->
next
;
else
eok
->
evs_head
=
ev
->
next
;
if
(
ev
->
next
)
ev
->
next
->
prev
=
ev
->
prev
;
else
eok
->
evs_tail
=
ev
->
prev
;
ev
->
prev
=
NULL
;
ev
->
next
=
eok
->
evs_free
;
eok
->
evs_free
=
ev
;
eok
->
evs_count
-=
1
;
}
static
void
eok_wakeup
(
ep_over_kq_t
*
eok
)
{
if
(
!
eok
->
waiting
)
return
;
if
(
eok
->
wakenup
)
return
;
eok
->
wakenup
=
1
;
A
(
1
==
send
(
eok
->
sv
[
1
],
"1"
,
1
,
0
),
""
);
}
static
int
eok_chgs_refresh
(
ep_over_kq_t
*
eok
,
eok_event_t
*
oev
,
eok_event_t
*
ev
,
struct
kevent64_s
*
krev
,
struct
kevent64_s
*
kwev
)
{
if
(
!
oev
)
oev
=
eok_calloc_ev
(
eok
);
if
(
!
oev
)
return
-
1
;
int
n
=
0
;
if
(
krev
->
ident
==
ev
->
fd
)
++
n
;
if
(
kwev
->
ident
==
ev
->
fd
)
++
n
;
A
(
n
,
"internal logic error"
);
if
(
eok
->
ichanges
+
n
>
eok
->
nchanges
)
{
struct
kevent64_s
*
p
=
(
struct
kevent64_s
*
)
realloc
(
eok
->
kchanges
,
sizeof
(
*
p
)
*
(
eok
->
nchanges
+
10
));
if
(
!
p
)
{
if
(
ev
->
changed
==
1
)
{
// roll back
A
(
oev
,
"internal logic error"
);
eok_free_ev
(
eok
,
oev
);
}
errno
=
ENOMEM
;
return
-
1
;
}
eok
->
kchanges
=
p
;
eok
->
nchanges
+=
10
;
}
// copy to registered event slot
oev
->
fd
=
ev
->
fd
;
if
(
ev
->
changed
!=
3
)
{
// if add/mod, copy epoll_event
oev
->
epev
=
ev
->
epev
;
}
oev
->
changed
=
ev
->
changed
;
// copy to changes list
n
=
0
;
if
(
krev
->
ident
==
ev
->
fd
)
{
krev
->
udata
=
(
uint64_t
)
oev
;
eok
->
kchanges
[
eok
->
ichanges
++
]
=
*
krev
;
++
n
;
}
if
(
kwev
->
ident
==
ev
->
fd
)
{
kwev
->
udata
=
(
uint64_t
)
oev
;
eok
->
kchanges
[
eok
->
ichanges
++
]
=
*
kwev
;
++
n
;
}
D
(
"epfd[%d]: add #changes[%d] for fd[%d], and now #changes/registers [%d/%d]"
,
eok
->
idx
,
n
,
ev
->
fd
,
eok
->
ichanges
,
eok
->
evs_count
);
return
0
;
}
static
ep_over_kq_t
*
eoks_alloc
(
void
)
{
ep_over_kq_t
*
eok
=
NULL
;
A
(
0
==
pthread_mutex_lock
(
&
eoks
.
lock
),
""
);
do
{
if
(
eoks
.
eoks_free_list
)
{
eok
=
eoks
.
eoks_free_list
;
eoks
.
eoks_free_list
=
eok
->
next
;
A
(
eoks
.
eoks
,
"internal logic error"
);
A
(
eok
->
idx
>=
0
&&
eok
->
idx
<
eoks
.
neoks
,
"internal logic error"
);
A
(
*
(
eoks
.
eoks
+
eok
->
idx
)
==
NULL
,
"internal logic error"
);
*
(
eoks
.
eoks
+
eok
->
idx
)
=
eok
;
eok
->
next
=
NULL
;
eok
->
stopping
=
0
;
break
;
}
eok
=
(
ep_over_kq_t
*
)
calloc
(
1
,
sizeof
(
*
eok
));
if
(
!
eok
)
break
;
eok
->
idx
=
-
1
;
ep_over_kq_t
**
ar
=
(
ep_over_kq_t
**
)
realloc
(
eoks
.
eoks
,
sizeof
(
**
ar
)
*
(
eoks
.
neoks
+
1
));
if
(
!
ar
)
break
;
eoks
.
eoks
=
ar
;
*
(
eoks
.
eoks
+
eoks
.
neoks
)
=
eok
;
eok
->
idx
=
eoks
.
neoks
;
eok
->
kq
=
-
1
;
eok
->
sv
[
0
]
=
-
1
;
eok
->
sv
[
1
]
=
-
1
;
eoks
.
neoks
+=
1
;
}
while
(
0
);
A
(
0
==
pthread_mutex_unlock
(
&
eoks
.
lock
),
""
);
if
(
!
eok
)
{
errno
=
ENOMEM
;
return
NULL
;
}
if
(
eok
->
idx
==-
1
)
{
free
(
eok
);
errno
=
ENOMEM
;
return
NULL
;
}
if
(
eok
->
lock_valid
)
{
return
eok
;
}
if
(
0
==
pthread_mutex_init
(
&
eok
->
lock
,
NULL
))
{
eok
->
lock_valid
=
1
;
return
eok
;
}
eoks_free
(
eok
);
errno
=
ENOMEM
;
return
NULL
;
}
static
void
eoks_free
(
ep_over_kq_t
*
eok
)
{
A
(
0
==
pthread_mutex_lock
(
&
eoks
.
lock
),
""
);
do
{
A
(
eok
->
idx
>=
0
&&
eok
->
idx
<
eoks
.
neoks
,
"internal logic error"
);
A
(
eok
->
next
==
NULL
,
"internal logic error"
);
// leave eok->kchanges as is
A
(
eok
->
ichanges
==
0
,
"internal logic error"
);
A
(
eok
->
waiting
==
0
,
"internal logic error"
);
eok_event_t
*
ev
=
eok
->
evs_head
;
int
sv_closed
=
0
;
while
(
ev
)
{
eok_event_t
*
next
=
ev
->
next
;
if
(
ev
->
fd
==
eok
->
sv
[
0
])
{
// fd is critical system resource
A
(
sv_closed
==
0
,
"internal logic error"
);
close
(
eok
->
sv
[
0
]);
eok
->
sv
[
0
]
=
-
1
;
close
(
eok
->
sv
[
1
]);
eok
->
sv
[
1
]
=
-
1
;
eok_free_ev
(
eok
,
ev
);
}
else
{
// user forget calling epoll_ctl(EPOLL_CTL_DEL) before calling epoll_close/close?
// calling close(ev->fd) here smells really bad
#ifndef BALANCE_CHECK_WHEN_CLOSE
// we just let it be and reclaim ev
eok_free_ev
(
eok
,
ev
);
#else
// panic otherwise, if BALANCE_CHECK_WHEN_CLOSE is defined
A
(
eok
->
evs_head
==
NULL
&&
eok
->
evs_tail
==
NULL
&&
eok
->
evs_count
==
0
,
"epfd[%d] fd[%d]: internal logic error: have you epoll_ctl(EPOLL_CTL_DEL) everything before calling epoll_close?"
,
eok
->
idx
,
ev
->
fd
);
#endif
}
ev
=
next
;
}
// if (eok->evs_count==1) {
// A(eok->evs_head && eok->evs_tail && eok->evs_head==eok->evs_tail, "internal logic error");
// A(eok->evs_head->fd==eok->sv[0] && eok->sv[0]!=-1 && eok->sv[1]!=-1, "internal logic error");
// // fd is critical system resource
// close(eok->sv[0]);
// eok->sv[0] = -1;
// close(eok->sv[1]);
// eok->sv[1] = -1;
// eok_free_ev(eok, eok->evs_head);
// }
A
(
eok
->
evs_head
==
NULL
&&
eok
->
evs_tail
==
NULL
&&
eok
->
evs_count
==
0
,
"internal logic error: have you epoll_ctl(EPOLL_CTL_DEL) everything before calling epoll_close?"
);
A
(
eok
->
sv
[
0
]
==-
1
&&
eok
->
sv
[
1
]
==-
1
,
"internal logic error"
);
if
(
eok
->
kq
!=-
1
)
{
close
(
eok
->
kq
);
eok
->
kq
=
-
1
;
}
eok
->
next
=
eoks
.
eoks_free_list
;
eoks
.
eoks_free_list
=
eok
;
*
(
eoks
.
eoks
+
eok
->
idx
)
=
NULL
;
}
while
(
0
);
A
(
0
==
pthread_mutex_unlock
(
&
eoks
.
lock
),
""
);
}
static
ep_over_kq_t
*
eoks_find
(
int
epfd
)
{
ep_over_kq_t
*
eok
=
NULL
;
A
(
0
==
pthread_mutex_lock
(
&
eoks
.
lock
),
""
);
do
{
if
(
epfd
<
0
||
epfd
>=
eoks
.
neoks
)
{
break
;
}
A
(
eoks
.
eoks
,
"internal logic error"
);
eok
=
*
(
eoks
.
eoks
+
epfd
);
A
(
eok
->
next
==
NULL
,
"internal logic error"
);
A
(
eok
->
lock_valid
,
"internal logic error"
);
}
while
(
0
);
A
(
0
==
pthread_mutex_unlock
(
&
eoks
.
lock
),
""
);
return
eok
;
}
src/os/src/detail/CMakeLists.txt
浏览文件 @
27351366
...
...
@@ -13,3 +13,4 @@ TARGET_LINK_LIBRARIES(osdetail os)
IF
(
TD_ARM_32 OR TD_LINUX_32
)
TARGET_LINK_LIBRARIES
(
osdetail atomic
)
ENDIF
()
src/os/src/detail/osSocket.c
浏览文件 @
27351366
src/os/src/detail/osTimer.c
浏览文件 @
27351366
...
...
@@ -116,6 +116,9 @@ void taosUninitTimer() {
pthread_sigmask(SIG_BLOCK, &set, NULL);
*/
void
taosMsleep
(
int
mseconds
)
{
#ifdef __APPLE__
taos_block_sigalrm
();
#endif // __APPLE__
#if 1
usleep
(
mseconds
*
1000
);
#else
...
...
@@ -136,6 +139,7 @@ void taosMsleep(int mseconds) {
/* pthread_sigmask(SIG_UNBLOCK, &set, NULL); */
#endif
}
#endif
src/plugins/http/CMakeLists.txt
浏览文件 @
27351366
...
...
@@ -23,3 +23,18 @@ IF (TD_LINUX)
TARGET_LINK_LIBRARIES
(
http admin
)
ENDIF
()
ENDIF
()
IF
(
TD_DARWIN
)
ADD_LIBRARY
(
http
${
SRC
}
)
TARGET_LINK_LIBRARIES
(
http z
)
IF
(
TD_SOMODE_STATIC
)
TARGET_LINK_LIBRARIES
(
http taos_static
)
ELSE
()
TARGET_LINK_LIBRARIES
(
http taos
)
ENDIF
()
IF
(
TD_ADMIN
)
TARGET_LINK_LIBRARIES
(
http admin
)
ENDIF
()
ENDIF
()
src/plugins/http/src/httpServer.c
浏览文件 @
27351366
...
...
@@ -31,11 +31,36 @@
static
bool
httpReadData
(
HttpContext
*
pContext
);
#ifdef __APPLE__
static
int
sv_dummy
=
0
;
#endif // __APPLE__
static
void
httpStopThread
(
HttpThread
*
pThread
)
{
pThread
->
stop
=
true
;
// signal the thread to stop, try graceful method first,
// and use pthread_cancel when failed
#ifdef __APPLE__
int
sv
[
2
];
sv
[
0
]
=
sv
[
1
]
=
-
1
;
int
r
=
socketpair
(
PF_LOCAL
,
SOCK_STREAM
,
0
,
sv
);
do
{
if
(
r
)
break
;
struct
epoll_event
ev
=
{
0
};
ev
.
events
=
EPOLLIN
;
ev
.
data
.
ptr
=
&
sv_dummy
;
pThread
->
stop
=
true
;
r
=
epoll_ctl
(
pThread
->
pollFd
,
EPOLL_CTL_ADD
,
sv
[
0
],
&
ev
);
if
(
r
)
break
;
if
(
1
!=
send
(
sv
[
1
],
"1"
,
1
,
0
))
{
r
=
-
1
;
break
;
}
}
while
(
0
);
if
(
r
)
{
pthread_cancel
(
pThread
->
thread
);
}
#else // __APPLE__
struct
epoll_event
event
=
{
.
events
=
EPOLLIN
};
eventfd_t
fd
=
eventfd
(
1
,
0
);
if
(
fd
==
-
1
)
{
...
...
@@ -46,13 +71,29 @@ static void httpStopThread(HttpThread* pThread) {
httpError
(
"%s, failed to call epoll_ctl, will call pthread_cancel instead, which may result in data corruption: %s"
,
pThread
->
label
,
strerror
(
errno
));
pthread_cancel
(
pThread
->
thread
);
}
#endif // __APPLE__
pthread_join
(
pThread
->
thread
,
NULL
);
#ifdef __APPLE__
if
(
sv
[
0
]
!=-
1
)
{
close
(
sv
[
0
]);
sv
[
0
]
=
-
1
;
}
if
(
sv
[
1
]
!=-
1
)
{
close
(
sv
[
1
]);
sv
[
1
]
=
-
1
;
}
#else // __APPLE__
if
(
fd
!=
-
1
)
{
close
(
fd
);
}
#endif // __APPLE__
#ifdef __APPLE__
epoll_close
(
pThread
->
pollFd
);
#else
close
(
pThread
->
pollFd
);
#endif
pthread_mutex_destroy
(
&
(
pThread
->
threadMutex
));
}
...
...
@@ -96,6 +137,15 @@ static void httpProcessHttpData(void *param) {
if
(
fdNum
<=
0
)
continue
;
for
(
int32_t
i
=
0
;
i
<
fdNum
;
++
i
)
{
#ifdef __APPLE__
if
(
events
[
i
].
data
.
ptr
==
&
sv_dummy
)
{
// no need to drain the recv buffer of sv[0]
// since there's only one time to send at most 1 byte to sv[0]
// btw, pThread->stop shall be already set, thus never reached here
httpDebug
(
"if you see this line, there's internal logic error"
);
continue
;
}
#endif // __APPLE__
pContext
=
httpGetContext
(
events
[
i
].
data
.
ptr
);
if
(
pContext
==
NULL
)
{
httpError
(
"context:%p, is already released, close connect"
,
events
[
i
].
data
.
ptr
);
...
...
src/plugins/monitor/CMakeLists.txt
浏览文件 @
27351366
...
...
@@ -15,3 +15,13 @@ IF (TD_LINUX)
TARGET_LINK_LIBRARIES
(
monitor taos
)
ENDIF
()
ENDIF
()
IF
(
TD_DARWIN
)
ADD_LIBRARY
(
monitor
${
SRC
}
)
IF
(
TD_SOMODE_STATIC
)
TARGET_LINK_LIBRARIES
(
monitor taos_static
)
ELSE
()
TARGET_LINK_LIBRARIES
(
monitor taos
)
ENDIF
()
ENDIF
()
src/plugins/mqtt/CMakeLists.txt
浏览文件 @
27351366
...
...
@@ -18,3 +18,15 @@ IF (TD_LINUX)
TARGET_LINK_LIBRARIES
(
mqtt taos
)
ENDIF
()
ENDIF
()
IF
(
TD_DARWIN
)
ADD_LIBRARY
(
mqtt
${
SRC
}
)
TARGET_LINK_LIBRARIES
(
mqtt cJson mqttc
)
IF
(
TD_SOMODE_STATIC
)
TARGET_LINK_LIBRARIES
(
mqtt taos_static
)
ELSE
()
TARGET_LINK_LIBRARIES
(
mqtt taos
)
ENDIF
()
ENDIF
()
src/query/CMakeLists.txt
浏览文件 @
27351366
...
...
@@ -14,3 +14,8 @@ IF (TD_LINUX)
TARGET_LINK_LIBRARIES
(
query m rt
)
ADD_SUBDIRECTORY
(
tests
)
ENDIF
()
IF
(
TD_DARWIN
)
TARGET_LINK_LIBRARIES
(
query m
)
ADD_SUBDIRECTORY
(
tests
)
ENDIF
()
src/rpc/src/rpcTcp.c
浏览文件 @
27351366
...
...
@@ -307,7 +307,14 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *
int
code
=
pthread_create
(
&
(
pThreadObj
->
thread
),
&
thattr
,
taosProcessTcpData
,
(
void
*
)(
pThreadObj
));
pthread_attr_destroy
(
&
thattr
);
if
(
code
!=
0
)
{
#ifdef __APPLE__
if
(
pThreadObj
->
pollFd
!=-
1
)
{
epoll_close
(
pThreadObj
->
pollFd
);
pThreadObj
->
pollFd
=
-
1
;
}
#else // __APPLE__
taosCloseSocket
(
pThreadObj
->
pollFd
);
#endif // __APPLE__
free
(
pThreadObj
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
tError
(
"%s failed to create TCP read data thread(%s)"
,
label
,
strerror
(
errno
));
...
...
@@ -471,6 +478,9 @@ static void *taosProcessTcpData(void *param) {
struct
epoll_event
events
[
maxEvents
];
SRecvInfo
recvInfo
;
#ifdef __APPLE__
taos_block_sigalrm
();
#endif // __APPLE__
while
(
1
)
{
int
fdNum
=
epoll_wait
(
pThreadObj
->
pollFd
,
events
,
maxEvents
,
TAOS_EPOLL_WAIT_TIME
);
if
(
pThreadObj
->
stop
)
{
...
...
@@ -512,7 +522,14 @@ static void *taosProcessTcpData(void *param) {
if
(
pThreadObj
->
stop
)
break
;
}
#ifdef __APPLE__
if
(
pThreadObj
->
pollFd
>=
0
)
{
epoll_close
(
pThreadObj
->
pollFd
);
pThreadObj
->
pollFd
=
-
1
;
}
#else // __APPLE__
if
(
pThreadObj
->
pollFd
>=
0
)
taosCloseSocket
(
pThreadObj
->
pollFd
);
#endif // __APPLE__
while
(
pThreadObj
->
pHead
)
{
SFdObj
*
pFdObj
=
pThreadObj
->
pHead
;
...
...
src/rpc/test/CMakeLists.txt
浏览文件 @
27351366
...
...
@@ -16,3 +16,17 @@ IF (TD_LINUX)
ADD_EXECUTABLE
(
rserver
${
SERVER_SRC
}
)
TARGET_LINK_LIBRARIES
(
rserver trpc
)
ENDIF
()
IF
(
TD_DARWIN
)
LIST
(
APPEND CLIENT_SRC ./rclient.c
)
ADD_EXECUTABLE
(
rclient
${
CLIENT_SRC
}
)
TARGET_LINK_LIBRARIES
(
rclient trpc
)
LIST
(
APPEND SCLIENT_SRC ./rsclient.c
)
ADD_EXECUTABLE
(
rsclient
${
SCLIENT_SRC
}
)
TARGET_LINK_LIBRARIES
(
rsclient trpc
)
LIST
(
APPEND SERVER_SRC ./rserver.c
)
ADD_EXECUTABLE
(
rserver
${
SERVER_SRC
}
)
TARGET_LINK_LIBRARIES
(
rserver trpc
)
ENDIF
()
src/sync/CMakeLists.txt
浏览文件 @
27351366
...
...
@@ -16,3 +16,16 @@ IF (TD_LINUX)
#ADD_SUBDIRECTORY(test)
ENDIF
()
IF
(
TD_DARWIN
)
LIST
(
REMOVE_ITEM SRC src/syncArbitrator.c
)
ADD_LIBRARY
(
sync
${
SRC
}
)
TARGET_LINK_LIBRARIES
(
sync tutil pthread common
)
LIST
(
APPEND BIN_SRC src/syncArbitrator.c
)
LIST
(
APPEND BIN_SRC src/syncTcp.c
)
ADD_EXECUTABLE
(
tarbitrator
${
BIN_SRC
}
)
TARGET_LINK_LIBRARIES
(
tarbitrator sync common osdetail tutil
)
#ADD_SUBDIRECTORY(test)
ENDIF
()
src/sync/src/syncRetrieve.c
浏览文件 @
27351366
...
...
@@ -14,7 +14,7 @@
*/
#define _DEFAULT_SOURCE
#include <sys/inotify.h>
//
#include <sys/inotify.h>
#include "os.h"
#include "taoserror.h"
#include "tlog.h"
...
...
src/sync/src/syncTcp.c
浏览文件 @
27351366
...
...
@@ -233,7 +233,11 @@ static void *syncProcessTcpData(void *param) {
sDebug
(
"%p TCP epoll thread exits"
,
pThread
);
#ifdef __APPLE__
epoll_close
(
pThread
->
pollFd
);
#else // __APPLE__
close
(
pThread
->
pollFd
);
#endif // __APPLE__
tfree
(
pThread
);
tfree
(
buffer
);
return
NULL
;
...
...
@@ -290,7 +294,11 @@ static SThreadObj *syncGetTcpThread(SPoolObj *pPool) {
pthread_attr_destroy
(
&
thattr
);
if
(
ret
!=
0
)
{
#ifdef __APPLE__
epoll_close
(
pThread
->
pollFd
);
#else // __APPLE__
close
(
pThread
->
pollFd
);
#endif // __APPLE__
tfree
(
pThread
);
return
NULL
;
}
...
...
src/sync/test/CMakeLists.txt
浏览文件 @
27351366
...
...
@@ -13,4 +13,3 @@ IF (TD_LINUX)
TARGET_LINK_LIBRARIES
(
syncServer sync trpc common
)
ENDIF
()
src/tsdb/inc/tsdbMain.h
浏览文件 @
27351366
...
...
@@ -233,7 +233,11 @@ typedef struct {
SMemTable
*
mem
;
SMemTable
*
imem
;
STsdbFileH
*
tsdbFileH
;
#ifdef __APPLE__
sem_t
*
readyToCommit
;
#else // __APPLE__
sem_t
readyToCommit
;
#endif // __APPLE__
pthread_mutex_t
mutex
;
bool
repoLocked
;
int32_t
code
;
// Commit code
...
...
src/tsdb/src/tsdbCommit.c
浏览文件 @
27351366
...
...
@@ -166,7 +166,11 @@ static void tsdbEndCommit(STsdbRepo *pRepo, int eno) {
pRepo
->
imem
=
NULL
;
tsdbUnlockRepo
(
pRepo
);
tsdbUnRefMemTable
(
pRepo
,
pIMem
);
#ifdef __APPLE__
sem_post
(
pRepo
->
readyToCommit
);
#else // __APPLE__
sem_post
(
&
(
pRepo
->
readyToCommit
));
#endif // __APPLE__
}
static
int
tsdbHasDataToCommit
(
SCommitIter
*
iters
,
int
nIters
,
TSKEY
minKey
,
TSKEY
maxKey
)
{
...
...
src/tsdb/src/tsdbMain.c
浏览文件 @
27351366
...
...
@@ -146,7 +146,11 @@ int tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
if
(
toCommit
)
{
tsdbAsyncCommit
(
pRepo
);
#ifdef __APPLE__
sem_wait
(
pRepo
->
readyToCommit
);
#else // __APPLE__
sem_wait
(
&
(
pRepo
->
readyToCommit
));
#endif // __APPLE__
terrno
=
pRepo
->
code
;
}
tsdbUnRefMemTable
(
pRepo
,
pRepo
->
mem
);
...
...
@@ -643,11 +647,21 @@ static STsdbRepo *tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg) {
goto
_err
;
}
#ifdef __APPLE__
pRepo
->
readyToCommit
=
sem_open
(
NULL
,
O_CREAT
,
0644
,
1
);
if
(
pRepo
->
readyToCommit
==
SEM_FAILED
)
{
code
=
errno
;
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
goto
_err
;
}
#else // __APPLE__
code
=
sem_init
(
&
(
pRepo
->
readyToCommit
),
0
,
1
);
if
(
code
!=
0
)
{
code
=
errno
;
terrno
=
TAOS_SYSTEM_ERROR
(
code
);
goto
_err
;
}
#endif // __APPLE__
pRepo
->
repoLocked
=
false
;
...
...
@@ -693,7 +707,11 @@ static void tsdbFreeRepo(STsdbRepo *pRepo) {
// tsdbFreeMemTable(pRepo->mem);
// tsdbFreeMemTable(pRepo->imem);
tfree
(
pRepo
->
rootDir
);
#ifdef __APPLE__
sem_close
(
pRepo
->
readyToCommit
);
#else // __APPLE__
sem_destroy
(
&
(
pRepo
->
readyToCommit
));
#endif // __APPLE__
pthread_mutex_destroy
(
&
pRepo
->
mutex
);
free
(
pRepo
);
}
...
...
src/tsdb/src/tsdbMemTable.c
浏览文件 @
27351366
...
...
@@ -207,7 +207,11 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
int
tsdbAsyncCommit
(
STsdbRepo
*
pRepo
)
{
if
(
pRepo
->
mem
==
NULL
)
return
0
;
#ifdef __APPLE__
sem_wait
(
pRepo
->
readyToCommit
);
#else // __APPLE__
sem_wait
(
&
(
pRepo
->
readyToCommit
));
#endif // __APPLE__
ASSERT
(
pRepo
->
imem
==
NULL
);
...
...
@@ -229,8 +233,13 @@ int tsdbSyncCommit(TSDB_REPO_T *repo) {
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
tsdbAsyncCommit
(
pRepo
);
#ifdef __APPLE__
sem_wait
(
pRepo
->
readyToCommit
);
sem_post
(
pRepo
->
readyToCommit
);
#else // __APPLE__
sem_wait
(
&
(
pRepo
->
readyToCommit
));
sem_post
(
&
(
pRepo
->
readyToCommit
));
#endif // __APPLE__
if
(
pRepo
->
code
!=
TSDB_CODE_SUCCESS
)
{
terrno
=
pRepo
->
code
;
...
...
src/util/CMakeLists.txt
浏览文件 @
27351366
...
...
@@ -28,5 +28,6 @@ IF (TD_LINUX)
ELSEIF
(
TD_WINDOWS
)
TARGET_LINK_LIBRARIES
(
tutil iconv regex winmm IPHLPAPI ws2_32 wepoll
)
ELSEIF
(
TD_DARWIN
)
TARGET_LINK_LIBRARIES
(
tutil m
)
TARGET_LINK_LIBRARIES
(
tutil iconv
)
ENDIF
()
src/util/src/tsocket.c
浏览文件 @
27351366
...
...
@@ -354,6 +354,8 @@ int32_t taosKeepTcpAlive(SOCKET sockFd) {
return
-
1
;
}
#ifndef __APPLE__
// all fails on macosx
int32_t
probes
=
3
;
if
(
taosSetSockOpt
(
sockFd
,
SOL_TCP
,
TCP_KEEPCNT
,
(
void
*
)
&
probes
,
sizeof
(
probes
))
<
0
)
{
uError
(
"fd:%d setsockopt SO_KEEPCNT failed: %d (%s)"
,
sockFd
,
errno
,
strerror
(
errno
));
...
...
@@ -374,6 +376,7 @@ int32_t taosKeepTcpAlive(SOCKET sockFd) {
taosCloseSocket
(
sockFd
);
return
-
1
;
}
#endif // __APPLE__
int32_t
nodelay
=
1
;
if
(
taosSetSockOpt
(
sockFd
,
IPPROTO_TCP
,
TCP_NODELAY
,
(
void
*
)
&
nodelay
,
sizeof
(
nodelay
))
<
0
)
{
...
...
src/vnode/CMakeLists.txt
浏览文件 @
27351366
...
...
@@ -13,3 +13,9 @@ IF (TD_LINUX)
ADD_LIBRARY
(
vnode
${
SRC
}
)
TARGET_LINK_LIBRARIES
(
vnode tsdb tcq
)
ENDIF
()
IF
(
TD_DARWIN
)
ADD_LIBRARY
(
vnode
${
SRC
}
)
TARGET_LINK_LIBRARIES
(
vnode tsdb tcq
)
ENDIF
()
src/wal/CMakeLists.txt
浏览文件 @
27351366
...
...
@@ -9,3 +9,9 @@ IF (TD_LINUX)
TARGET_LINK_LIBRARIES
(
twal tutil common
)
ADD_SUBDIRECTORY
(
test
)
ENDIF
()
IF
(
TD_DARWIN
)
ADD_LIBRARY
(
twal
${
SRC
}
)
TARGET_LINK_LIBRARIES
(
twal tutil common
)
ADD_SUBDIRECTORY
(
test
)
ENDIF
()
src/wal/test/CMakeLists.txt
浏览文件 @
27351366
...
...
@@ -10,4 +10,12 @@ IF (TD_LINUX)
ENDIF
()
IF
(
TD_DARWIN
)
INCLUDE_DIRECTORIES
(
../inc
)
LIST
(
APPEND WALTEST_SRC ./waltest.c
)
ADD_EXECUTABLE
(
waltest
${
WALTEST_SRC
}
)
TARGET_LINK_LIBRARIES
(
waltest twal osdetail tutil
)
ENDIF
()
tests/comparisonTest/tdengine/CMakeLists.txt
浏览文件 @
27351366
...
...
@@ -5,3 +5,8 @@ IF (TD_LINUX)
add_executable
(
tdengineTest tdengineTest.c
)
target_link_libraries
(
tdengineTest taos_static tutil common pthread
)
ENDIF
()
IF
(
TD_DARWIN
)
add_executable
(
tdengineTest tdengineTest.c
)
target_link_libraries
(
tdengineTest taos_static tutil common pthread
)
ENDIF
()
tests/examples/c/CMakeLists.txt
浏览文件 @
27351366
...
...
@@ -5,4 +5,14 @@ IF (TD_LINUX)
AUX_SOURCE_DIRECTORY
(
. SRC
)
ADD_EXECUTABLE
(
demo apitest.c
)
TARGET_LINK_LIBRARIES
(
demo taos_static trpc tutil pthread
)
ADD_EXECUTABLE
(
epoll epoll.c
)
TARGET_LINK_LIBRARIES
(
epoll taos_static trpc tutil pthread
)
ENDIF
()
IF
(
TD_DARWIN
)
INCLUDE_DIRECTORIES
(
.
${
TD_COMMUNITY_DIR
}
/src/inc
${
TD_COMMUNITY_DIR
}
/src/client/inc
${
TD_COMMUNITY_DIR
}
/inc
)
AUX_SOURCE_DIRECTORY
(
. SRC
)
ADD_EXECUTABLE
(
demo demo.c
)
TARGET_LINK_LIBRARIES
(
demo taos_static trpc tutil pthread
)
ADD_EXECUTABLE
(
epoll epoll.c
)
TARGET_LINK_LIBRARIES
(
epoll taos_static trpc tutil pthread
)
ENDIF
()
tests/examples/c/demo.c
浏览文件 @
27351366
...
...
@@ -86,7 +86,7 @@ void Test(TAOS *taos, char *qstr, int index) {
int
i
=
0
;
for
(
i
=
0
;
i
<
10
;
++
i
)
{
sprintf
(
qstr
,
"insert into m1 values (%"
PRId64
", %d, %d, %d, %d, %f, %lf, '%s')"
,
1546300800000
+
i
*
1000
,
i
,
i
,
i
,
i
*
10000000
,
i
*
1
.
0
,
i
*
2
.
0
,
"hello"
);
sprintf
(
qstr
,
"insert into m1 values (%"
PRId64
", %d, %d, %d, %d, %f, %lf, '%s')"
,
(
uint64_t
)(
1546300800000
+
i
*
1000
)
,
i
,
i
,
i
,
i
*
10000000
,
i
*
1
.
0
,
i
*
2
.
0
,
"hello"
);
printf
(
"qstr: %s
\n
"
,
qstr
);
// note: how do you wanna do if taos_query returns non-NULL
...
...
tests/examples/c/epoll.c
0 → 100644
浏览文件 @
27351366
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifdef __APPLE__
#include "eok.h"
#else // __APPLE__
#include <sys/epoll.h>
#endif // __APPLE__
#include <sys/types.h>
#include <sys/time.h>
#include <sys/socket.h>
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <errno.h>
#include <string.h>
#include <arpa/inet.h>
#include <libgen.h>
#define D(fmt, ...) fprintf(stderr, "%s[%d]%s(): " fmt "\n", basename(__FILE__), __LINE__, __func__, ##__VA_ARGS__)
#define A(statement, fmt, ...) do { \
if (statement) break; \
fprintf(stderr, "%s[%d]%s(): assert [%s] failed: %d[%s]: " fmt "\n", \
basename(__FILE__), __LINE__, __func__, \
#statement, errno, strerror(errno), \
##__VA_ARGS__); \
abort(); \
} while (0)
#define E(fmt, ...) do { \
fprintf(stderr, "%s[%d]%s(): %d[%s]: " fmt "\n", \
basename(__FILE__), __LINE__, __func__, \
errno, strerror(errno), \
##__VA_ARGS__); \
} while (0)
typedef
struct
ep_s
ep_t
;
struct
ep_s
{
int
ep
;
pthread_mutex_t
lock
;
int
sv
[
2
];
// 0 for read, 1 for write;
pthread_t
thread
;
volatile
unsigned
int
stopping
:
1
;
volatile
unsigned
int
waiting
:
1
;
volatile
unsigned
int
wakenup
:
1
;
};
static
int
ep_dummy
=
0
;
static
ep_t
*
ep_create
(
void
);
static
void
ep_destroy
(
ep_t
*
ep
);
static
void
*
routine
(
void
*
arg
);
static
int
open_connect
(
unsigned
short
port
);
static
int
open_listen
(
unsigned
short
port
);
typedef
struct
client_s
client_t
;
struct
client_s
{
int
skt
;
void
(
*
on_event
)(
ep_t
*
ep
,
struct
epoll_event
*
events
,
client_t
*
client
);
volatile
unsigned
int
state
;
// 1: listenning; 2: connected
};
static
void
echo_event
(
ep_t
*
ep
,
struct
epoll_event
*
ev
,
client_t
*
client
);
int
main
(
int
argc
,
char
*
argv
[])
{
ep_t
*
ep
=
ep_create
();
A
(
ep
,
"failed"
);
int
skt
=
open_connect
(
6789
);
if
(
skt
!=-
1
)
{
client_t
*
client
=
(
client_t
*
)
calloc
(
1
,
sizeof
(
*
client
));
if
(
client
)
{
client
->
skt
=
skt
;
client
->
on_event
=
echo_event
;
client
->
state
=
2
;
struct
epoll_event
ev
=
{
0
};
ev
.
events
=
EPOLLIN
|
EPOLLERR
|
EPOLLHUP
|
EPOLLRDHUP
;
ev
.
data
.
ptr
=
client
;
A
(
0
==
epoll_ctl
(
ep
->
ep
,
EPOLL_CTL_ADD
,
skt
,
&
ev
),
""
);
}
}
skt
=
open_listen
(
0
);
if
(
skt
!=-
1
)
{
client_t
*
client
=
(
client_t
*
)
calloc
(
1
,
sizeof
(
*
client
));
if
(
client
)
{
client
->
skt
=
skt
;
client
->
on_event
=
echo_event
;
client
->
state
=
1
;
struct
epoll_event
ev
=
{
0
};
ev
.
events
=
EPOLLIN
|
EPOLLERR
|
EPOLLHUP
|
EPOLLRDHUP
;
ev
.
data
.
ptr
=
client
;
A
(
0
==
epoll_ctl
(
ep
->
ep
,
EPOLL_CTL_ADD
,
skt
,
&
ev
),
""
);
}
}
// char c = '\0';
// while ((c=getchar())!=EOF) {
// switch (c) {
// case 'q': break;
// default: continue;
// }
// }
// getchar();
char
*
line
=
NULL
;
size_t
linecap
=
0
;
ssize_t
linelen
;
while
((
linelen
=
getline
(
&
line
,
&
linecap
,
stdin
))
>
0
)
{
line
[
strlen
(
line
)
-
1
]
=
'\0'
;
if
(
0
==
strcmp
(
line
,
"exit"
))
break
;
if
(
0
==
strcmp
(
line
,
"quit"
))
break
;
if
(
line
==
strstr
(
line
,
"close"
))
{
int
fd
=
0
;
sscanf
(
line
,
"close %d"
,
&
fd
);
if
(
fd
<=
2
)
{
fprintf
(
stderr
,
"fd [%d] invalid
\n
"
,
fd
);
continue
;
}
A
(
0
==
epoll_ctl
(
ep
->
ep
,
EPOLL_CTL_DEL
,
fd
,
NULL
),
""
);
continue
;
}
if
(
strlen
(
line
)
==
0
)
continue
;
fprintf
(
stderr
,
"unknown cmd:[%s]
\n
"
,
line
);
}
ep_destroy
(
ep
);
D
(
""
);
return
0
;
}
ep_t
*
ep_create
(
void
)
{
ep_t
*
ep
=
(
ep_t
*
)
calloc
(
1
,
sizeof
(
*
ep
));
A
(
ep
,
"out of memory"
);
A
(
-
1
!=
(
ep
->
ep
=
epoll_create
(
1
)),
""
);
ep
->
sv
[
0
]
=
-
1
;
ep
->
sv
[
1
]
=
-
1
;
A
(
0
==
socketpair
(
AF_LOCAL
,
SOCK_STREAM
,
0
,
ep
->
sv
),
""
);
A
(
0
==
pthread_mutex_init
(
&
ep
->
lock
,
NULL
),
""
);
A
(
0
==
pthread_mutex_lock
(
&
ep
->
lock
),
""
);
struct
epoll_event
ev
=
{
0
};
ev
.
events
=
EPOLLIN
;
ev
.
data
.
ptr
=
&
ep_dummy
;
A
(
0
==
epoll_ctl
(
ep
->
ep
,
EPOLL_CTL_ADD
,
ep
->
sv
[
0
],
&
ev
),
""
);
A
(
0
==
pthread_create
(
&
ep
->
thread
,
NULL
,
routine
,
ep
),
""
);
A
(
0
==
pthread_mutex_unlock
(
&
ep
->
lock
),
""
);
return
ep
;
}
static
void
ep_destroy
(
ep_t
*
ep
)
{
A
(
ep
,
"invalid argument"
);
ep
->
stopping
=
1
;
A
(
1
==
send
(
ep
->
sv
[
1
],
"1"
,
1
,
0
),
""
);
A
(
0
==
pthread_join
(
ep
->
thread
,
NULL
),
""
);
A
(
0
==
pthread_mutex_destroy
(
&
ep
->
lock
),
""
);
A
(
0
==
close
(
ep
->
sv
[
0
]),
""
);
A
(
0
==
close
(
ep
->
sv
[
1
]),
""
);
A
(
0
==
close
(
ep
->
ep
),
""
);
free
(
ep
);
}
static
void
*
routine
(
void
*
arg
)
{
A
(
arg
,
"invalid argument"
);
ep_t
*
ep
=
(
ep_t
*
)
arg
;
while
(
!
ep
->
stopping
)
{
struct
epoll_event
evs
[
10
];
memset
(
evs
,
0
,
sizeof
(
evs
));
A
(
0
==
pthread_mutex_lock
(
&
ep
->
lock
),
""
);
A
(
ep
->
waiting
==
0
,
"internal logic error"
);
ep
->
waiting
=
1
;
A
(
0
==
pthread_mutex_unlock
(
&
ep
->
lock
),
""
);
int
r
=
epoll_wait
(
ep
->
ep
,
evs
,
sizeof
(
evs
)
/
sizeof
(
evs
[
0
]),
-
1
);
A
(
r
>
0
,
"indefinite epoll_wait shall not timeout:[%d]"
,
r
);
A
(
0
==
pthread_mutex_lock
(
&
ep
->
lock
),
""
);
A
(
ep
->
waiting
==
1
,
"internal logic error"
);
ep
->
waiting
=
0
;
A
(
0
==
pthread_mutex_unlock
(
&
ep
->
lock
),
""
);
for
(
int
i
=
0
;
i
<
r
;
++
i
)
{
struct
epoll_event
*
ev
=
evs
+
i
;
if
(
ev
->
data
.
ptr
==
&
ep_dummy
)
{
char
c
=
'\0'
;
A
(
1
==
recv
(
ep
->
sv
[
0
],
&
c
,
1
,
0
),
"internal logic error"
);
A
(
0
==
pthread_mutex_lock
(
&
ep
->
lock
),
""
);
ep
->
wakenup
=
0
;
A
(
0
==
pthread_mutex_unlock
(
&
ep
->
lock
),
""
);
D
(
"........"
);
continue
;
}
A
(
ev
->
data
.
ptr
,
"internal logic error"
);
client_t
*
client
=
(
client_t
*
)
ev
->
data
.
ptr
;
client
->
on_event
(
ep
,
ev
,
client
);
continue
;
}
}
return
NULL
;
}
static
int
open_listen
(
unsigned
short
port
)
{
int
r
=
0
;
int
skt
=
socket
(
AF_INET
,
SOCK_STREAM
,
IPPROTO_TCP
);
if
(
skt
==-
1
)
{
E
(
"socket() failed"
);
return
-
1
;
}
do
{
struct
sockaddr_in
si
=
{
0
};
si
.
sin_family
=
AF_INET
;
si
.
sin_addr
.
s_addr
=
inet_addr
(
"127.0.0.1"
);
si
.
sin_port
=
htons
(
port
);
r
=
bind
(
skt
,
(
struct
sockaddr
*
)
&
si
,
sizeof
(
si
));
if
(
r
)
{
E
(
"bind(%u) failed"
,
port
);
break
;
}
r
=
listen
(
skt
,
100
);
if
(
r
)
{
E
(
"listen() failed"
);
break
;
}
memset
(
&
si
,
0
,
sizeof
(
si
));
socklen_t
len
=
sizeof
(
si
);
r
=
getsockname
(
skt
,
(
struct
sockaddr
*
)
&
si
,
&
len
);
if
(
r
)
{
E
(
"getsockname() failed"
);
}
A
(
len
==
sizeof
(
si
),
"internal logic error"
);
D
(
"listenning at: %d"
,
ntohs
(
si
.
sin_port
));
return
skt
;
}
while
(
0
);
close
(
skt
);
return
-
1
;
}
static
int
open_connect
(
unsigned
short
port
)
{
int
r
=
0
;
int
skt
=
socket
(
AF_INET
,
SOCK_STREAM
,
IPPROTO_TCP
);
if
(
skt
==-
1
)
{
E
(
"socket() failed"
);
return
-
1
;
}
do
{
struct
sockaddr_in
si
=
{
0
};
si
.
sin_family
=
AF_INET
;
si
.
sin_addr
.
s_addr
=
inet_addr
(
"127.0.0.1"
);
si
.
sin_port
=
htons
(
port
);
r
=
connect
(
skt
,
(
struct
sockaddr
*
)
&
si
,
sizeof
(
si
));
if
(
r
)
{
E
(
"connect(%u) failed"
,
port
);
break
;
}
memset
(
&
si
,
0
,
sizeof
(
si
));
socklen_t
len
=
sizeof
(
si
);
r
=
getsockname
(
skt
,
(
struct
sockaddr
*
)
&
si
,
&
len
);
if
(
r
)
{
E
(
"getsockname() failed"
);
}
A
(
len
==
sizeof
(
si
),
"internal logic error"
);
D
(
"connected: %d"
,
ntohs
(
si
.
sin_port
));
return
skt
;
}
while
(
0
);
close
(
skt
);
return
-
1
;
}
static
void
echo_event
(
ep_t
*
ep
,
struct
epoll_event
*
ev
,
client_t
*
client
)
{
if
(
ev
->
events
&
EPOLLIN
)
{
if
(
client
->
state
==
1
)
{
struct
sockaddr_in
si
=
{
0
};
socklen_t
silen
=
sizeof
(
si
);
int
skt
=
accept
(
client
->
skt
,
(
struct
sockaddr
*
)
&
si
,
&
silen
);
if
(
skt
!=-
1
)
{
client_t
*
server
=
(
client_t
*
)
calloc
(
1
,
sizeof
(
*
server
));
if
(
server
)
{
server
->
skt
=
skt
;
server
->
on_event
=
echo_event
;
server
->
state
=
2
;
struct
epoll_event
ev
=
{
0
};
ev
.
events
=
EPOLLIN
|
EPOLLERR
|
EPOLLHUP
|
EPOLLRDHUP
;
ev
.
data
.
ptr
=
server
;
A
(
0
==
epoll_ctl
(
ep
->
ep
,
EPOLL_CTL_ADD
,
skt
,
&
ev
),
""
);
}
}
}
if
(
client
->
state
==
2
)
{
char
buf
[
4
];
int
n
=
recv
(
client
->
skt
,
buf
,
sizeof
(
buf
)
-
1
,
0
);
A
(
n
>=
0
&&
n
<
sizeof
(
buf
),
"internal logic error:[%d]"
,
n
);
buf
[
n
]
=
'\0'
;
fprintf
(
stderr
,
"events[%x]:%s
\n
"
,
ev
->
events
,
buf
);
}
}
if
(
ev
->
events
&
(
EPOLLERR
|
EPOLLHUP
|
EPOLLRDHUP
))
{
A
(
0
==
pthread_mutex_lock
(
&
ep
->
lock
),
""
);
A
(
0
==
epoll_ctl
(
ep
->
ep
,
EPOLL_CTL_DEL
,
client
->
skt
,
NULL
),
""
);
A
(
0
==
pthread_mutex_unlock
(
&
ep
->
lock
),
""
);
close
(
client
->
skt
);
client
->
skt
=
-
1
;
client
->
on_event
=
NULL
;
free
(
client
);
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录