Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
9b754d70
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
9b754d70
编写于
11月 12, 2019
作者:
weixin_48148422
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add more atomic api
上级
317c98a2
变更
29
隐藏空白更改
内联
并排
Showing
29 changed file
with
470 addition
and
168 deletion
+470
-168
src/client/src/TSDBJNIConnector.c
src/client/src/TSDBJNIConnector.c
+3
-3
src/client/src/tscJoinProcess.c
src/client/src/tscJoinProcess.c
+5
-5
src/client/src/tscSecondaryMerge.c
src/client/src/tscSecondaryMerge.c
+3
-4
src/client/src/tscServer.c
src/client/src/tscServer.c
+5
-5
src/client/src/tscStream.c
src/client/src/tscStream.c
+1
-1
src/modules/http/src/httpServer.c
src/modules/http/src/httpServer.c
+3
-3
src/modules/http/src/httpSystem.c
src/modules/http/src/httpSystem.c
+1
-1
src/os/darwin/inc/os.h
src/os/darwin/inc/os.h
+65
-22
src/os/darwin/src/tdarwin.c
src/os/darwin/src/tdarwin.c
+0
-8
src/os/linux/inc/os.h
src/os/linux/inc/os.h
+65
-22
src/os/linux/src/tlinux.c
src/os/linux/src/tlinux.c
+0
-8
src/os/windows/inc/os.h
src/os/windows/inc/os.h
+146
-25
src/os/windows/src/twindows.c
src/os/windows/src/twindows.c
+121
-9
src/rpc/src/trpc.c
src/rpc/src/trpc.c
+4
-4
src/system/detail/inc/vnode.h
src/system/detail/inc/vnode.h
+3
-3
src/system/detail/src/dnodeSystem.c
src/system/detail/src/dnodeSystem.c
+2
-2
src/system/detail/src/mgmtShell.c
src/system/detail/src/mgmtShell.c
+8
-8
src/system/detail/src/vnodeCache.c
src/system/detail/src/vnodeCache.c
+3
-3
src/system/detail/src/vnodeFile.c
src/system/detail/src/vnodeFile.c
+1
-1
src/system/detail/src/vnodeImport.c
src/system/detail/src/vnodeImport.c
+2
-2
src/system/detail/src/vnodeMeter.c
src/system/detail/src/vnodeMeter.c
+2
-2
src/system/detail/src/vnodeQueryImpl.c
src/system/detail/src/vnodeQueryImpl.c
+2
-2
src/system/detail/src/vnodeShell.c
src/system/detail/src/vnodeShell.c
+2
-2
src/system/detail/src/vnodeStore.c
src/system/detail/src/vnodeStore.c
+1
-1
src/system/detail/src/vnodeUtil.c
src/system/detail/src/vnodeUtil.c
+3
-3
src/util/src/tcache.c
src/util/src/tcache.c
+8
-8
src/util/src/textbuffer.c
src/util/src/textbuffer.c
+1
-1
src/util/src/tlog.c
src/util/src/tlog.c
+2
-2
src/util/src/ttimer.c
src/util/src/ttimer.c
+8
-8
未找到文件。
src/client/src/TSDBJNIConnector.c
浏览文件 @
9b754d70
...
...
@@ -61,13 +61,13 @@ jmethodID g_rowdataSetByteArrayFp;
void
jniGetGlobalMethod
(
JNIEnv
*
env
)
{
// make sure init function executed once
switch
(
__sync_val_compare_and_swap
_32
(
&
__init
,
0
,
1
))
{
switch
(
atomic_val_compare_exchange
_32
(
&
__init
,
0
,
1
))
{
case
0
:
break
;
case
1
:
do
{
taosMsleep
(
0
);
}
while
(
__sync_val
_load_32
(
&
__init
)
==
1
);
}
while
(
atomic
_load_32
(
&
__init
)
==
1
);
case
2
:
return
;
}
...
...
@@ -107,7 +107,7 @@ void jniGetGlobalMethod(JNIEnv *env) {
g_rowdataSetByteArrayFp
=
(
*
env
)
->
GetMethodID
(
env
,
g_rowdataClass
,
"setByteArray"
,
"(I[B)V"
);
(
*
env
)
->
DeleteLocalRef
(
env
,
rowdataClass
);
__sync_val_re
store_32
(
&
__init
,
2
);
atomic_
store_32
(
&
__init
,
2
);
jniTrace
(
"native method register finished"
);
}
...
...
src/client/src/tscJoinProcess.c
浏览文件 @
9b754d70
...
...
@@ -353,7 +353,7 @@ static void doQuitSubquery(SSqlObj* pParentSql) {
}
static
void
quitAllSubquery
(
SSqlObj
*
pSqlObj
,
SJoinSubquerySupporter
*
pSupporter
)
{
if
(
__sync_add_an
d_fetch_32
(
&
pSupporter
->
pState
->
numOfCompleted
,
1
)
>=
pSupporter
->
pState
->
numOfTotal
)
{
if
(
atomic_ad
d_fetch_32
(
&
pSupporter
->
pState
->
numOfCompleted
,
1
)
>=
pSupporter
->
pState
->
numOfTotal
)
{
pSqlObj
->
res
.
code
=
abs
(
pSupporter
->
pState
->
code
);
tscError
(
"%p all subquery return and query failed, global code:%d"
,
pSqlObj
,
pSqlObj
->
res
.
code
);
...
...
@@ -412,7 +412,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
taos_fetch_rows_a
(
tres
,
joinRetrieveCallback
,
param
);
}
else
if
(
numOfRows
==
0
)
{
// no data from this vnode anymore
if
(
__sync_add_an
d_fetch_32
(
&
pSupporter
->
pState
->
numOfCompleted
,
1
)
>=
pSupporter
->
pState
->
numOfTotal
)
{
if
(
atomic_ad
d_fetch_32
(
&
pSupporter
->
pState
->
numOfCompleted
,
1
)
>=
pSupporter
->
pState
->
numOfTotal
)
{
if
(
pSupporter
->
pState
->
code
!=
TSDB_CODE_SUCCESS
)
{
tscTrace
(
"%p sub:%p, numOfSub:%d, quit from further procedure due to other queries failure"
,
pParentSql
,
tres
,
...
...
@@ -451,7 +451,7 @@ static void joinRetrieveCallback(void* param, TAOS_RES* tres, int numOfRows) {
tscError
(
"%p retrieve failed, code:%d, index:%d"
,
pSql
,
numOfRows
,
pSupporter
->
subqueryIndex
);
}
if
(
__sync_add_an
d_fetch_32
(
&
pSupporter
->
pState
->
numOfCompleted
,
1
)
>=
pSupporter
->
pState
->
numOfTotal
)
{
if
(
atomic_ad
d_fetch_32
(
&
pSupporter
->
pState
->
numOfCompleted
,
1
)
>=
pSupporter
->
pState
->
numOfTotal
)
{
tscTrace
(
"%p secondary retrieve completed, global code:%d"
,
tres
,
pParentSql
->
res
.
code
);
if
(
pSupporter
->
pState
->
code
!=
TSDB_CODE_SUCCESS
)
{
pParentSql
->
res
.
code
=
abs
(
pSupporter
->
pState
->
code
);
...
...
@@ -560,7 +560,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
SJoinSubquerySupporter
*
pSupporter
=
(
SJoinSubquerySupporter
*
)
param
;
// if (
__sync_add_an
d_fetch_32(pSupporter->numOfComplete, 1) >=
// if (
atomic_ad
d_fetch_32(pSupporter->numOfComplete, 1) >=
// pSupporter->numOfTotal) {
// SSqlObj *pParentObj = pSupporter->pObj;
//
...
...
@@ -605,7 +605,7 @@ void tscJoinQueryCallback(void* param, TAOS_RES* tres, int code) {
quitAllSubquery
(
pParentSql
,
pSupporter
);
}
else
{
if
(
__sync_add_an
d_fetch_32
(
&
pSupporter
->
pState
->
numOfCompleted
,
1
)
>=
pSupporter
->
pState
->
numOfTotal
)
{
if
(
atomic_ad
d_fetch_32
(
&
pSupporter
->
pState
->
numOfCompleted
,
1
)
>=
pSupporter
->
pState
->
numOfTotal
)
{
tscSetupOutputColumnIndex
(
pParentSql
);
if
(
pParentSql
->
fp
==
NULL
)
{
...
...
src/client/src/tscSecondaryMerge.c
浏览文件 @
9b754d70
...
...
@@ -432,11 +432,10 @@ void tscDestroyLocalReducer(SSqlObj *pSql) {
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
// there is no more result, so we release all allocated resource
SLocalReducer
*
pLocalReducer
=
(
SLocalReducer
*
)
__sync_val_compare_and_swap_64
(
&
pRes
->
pLocalReducer
,
pRes
->
pLocalReducer
,
0
);
SLocalReducer
*
pLocalReducer
=
(
SLocalReducer
*
)
atomic_exchange_ptr
(
&
pRes
->
pLocalReducer
,
NULL
);
if
(
pLocalReducer
!=
NULL
)
{
int32_t
status
=
0
;
while
((
status
=
__sync_val_compare_and_swap
_32
(
&
pLocalReducer
->
status
,
TSC_LOCALREDUCE_READY
,
while
((
status
=
atomic_val_compare_exchange
_32
(
&
pLocalReducer
->
status
,
TSC_LOCALREDUCE_READY
,
TSC_LOCALREDUCE_TOBE_FREED
))
==
TSC_LOCALREDUCE_IN_PROGRESS
)
{
taosMsleep
(
100
);
tscTrace
(
"%p waiting for delete procedure, status: %d"
,
pSql
,
status
);
...
...
@@ -1328,7 +1327,7 @@ int32_t tscLocalDoReduce(SSqlObj *pSql) {
// set the data merge in progress
int32_t
prevStatus
=
__sync_val_compare_and_swap
_32
(
&
pLocalReducer
->
status
,
TSC_LOCALREDUCE_READY
,
TSC_LOCALREDUCE_IN_PROGRESS
);
atomic_val_compare_exchange
_32
(
&
pLocalReducer
->
status
,
TSC_LOCALREDUCE_READY
,
TSC_LOCALREDUCE_IN_PROGRESS
);
if
(
prevStatus
!=
TSC_LOCALREDUCE_READY
||
pLocalReducer
==
NULL
)
{
assert
(
prevStatus
==
TSC_LOCALREDUCE_TOBE_FREED
);
// it is in tscDestroyLocalReducer function already
return
TSDB_CODE_SUCCESS
;
...
...
src/client/src/tscServer.c
浏览文件 @
9b754d70
...
...
@@ -1023,13 +1023,13 @@ static void tscHandleSubRetrievalError(SRetrieveSupport *trsupport, SSqlObj *pSq
tscProcessSql
(
pNew
);
return
;
}
else
{
// reach the maximum retry count, abort
__sync_val_compare_and_swap
_32
(
&
trsupport
->
pState
->
code
,
TSDB_CODE_SUCCESS
,
numOfRows
);
atomic_val_compare_exchange
_32
(
&
trsupport
->
pState
->
code
,
TSDB_CODE_SUCCESS
,
numOfRows
);
tscError
(
"%p sub:%p retrieve failed,code:%d,orderOfSub:%d failed.no more retry,set global code:%d"
,
pPObj
,
pSql
,
numOfRows
,
idx
,
trsupport
->
pState
->
code
);
}
}
if
(
__sync_add_an
d_fetch_32
(
&
trsupport
->
pState
->
numOfCompleted
,
1
)
<
trsupport
->
pState
->
numOfTotal
)
{
if
(
atomic_ad
d_fetch_32
(
&
trsupport
->
pState
->
numOfCompleted
,
1
)
<
trsupport
->
pState
->
numOfTotal
)
{
return
tscFreeSubSqlObj
(
trsupport
,
pSql
);
}
...
...
@@ -1095,7 +1095,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
if
(
numOfRows
>
0
)
{
assert
(
pRes
->
numOfRows
==
numOfRows
);
__sync_add_an
d_fetch_64
(
&
trsupport
->
pState
->
numOfRetrievedRows
,
numOfRows
);
atomic_ad
d_fetch_64
(
&
trsupport
->
pState
->
numOfRetrievedRows
,
numOfRows
);
tscTrace
(
"%p sub:%p retrieve numOfRows:%d totalNumOfRows:%d from ip:%u,vid:%d,orderOfSub:%d"
,
pPObj
,
pSql
,
pRes
->
numOfRows
,
trsupport
->
pState
->
numOfRetrievedRows
,
pSvd
->
ip
,
pSvd
->
vnode
,
idx
);
...
...
@@ -1154,7 +1154,7 @@ void tscRetrieveFromVnodeCallBack(void *param, TAOS_RES *tres, int numOfRows) {
return
tscAbortFurtherRetryRetrieval
(
trsupport
,
tres
,
TSDB_CODE_CLI_NO_DISKSPACE
);
}
if
(
__sync_add_an
d_fetch_32
(
&
trsupport
->
pState
->
numOfCompleted
,
1
)
<
trsupport
->
pState
->
numOfTotal
)
{
if
(
atomic_ad
d_fetch_32
(
&
trsupport
->
pState
->
numOfCompleted
,
1
)
<
trsupport
->
pState
->
numOfTotal
)
{
return
tscFreeSubSqlObj
(
trsupport
,
pSql
);
}
...
...
@@ -1283,7 +1283,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
if
(
trsupport
->
numOfRetry
++
>=
MAX_NUM_OF_SUBQUERY_RETRY
)
{
tscTrace
(
"%p sub:%p reach the max retry count,set global code:%d"
,
trsupport
->
pParentSqlObj
,
pSql
,
code
);
__sync_val_compare_and_swap
_32
(
&
trsupport
->
pState
->
code
,
0
,
code
);
atomic_val_compare_exchange
_32
(
&
trsupport
->
pState
->
code
,
0
,
code
);
}
else
{
// does not reach the maximum retry count, go on
tscTrace
(
"%p sub:%p failed code:%d, retry:%d"
,
trsupport
->
pParentSqlObj
,
pSql
,
code
,
trsupport
->
numOfRetry
);
...
...
src/client/src/tscStream.c
浏览文件 @
9b754d70
...
...
@@ -546,7 +546,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
void
taos_close_stream
(
TAOS_STREAM
*
handle
)
{
SSqlStream
*
pStream
=
(
SSqlStream
*
)
handle
;
SSqlObj
*
pSql
=
(
SSqlObj
*
)
__sync_val_compare_and_swap_64
(
&
pStream
->
pSql
,
pStream
->
pSql
,
0
);
SSqlObj
*
pSql
=
(
SSqlObj
*
)
atomic_exchange_ptr
(
&
pStream
->
pSql
,
0
);
if
(
pSql
==
NULL
)
{
return
;
}
...
...
src/modules/http/src/httpServer.c
浏览文件 @
9b754d70
...
...
@@ -72,7 +72,7 @@ void httpRemoveContextFromEpoll(HttpThread *pThread, HttpContext *pContext) {
}
bool
httpAlterContextState
(
HttpContext
*
pContext
,
HttpContextState
srcState
,
HttpContextState
destState
)
{
return
(
__sync_val_compare_and_swap
_32
(
&
pContext
->
state
,
srcState
,
destState
)
==
srcState
);
return
(
atomic_val_compare_exchange
_32
(
&
pContext
->
state
,
srcState
,
destState
)
==
srcState
);
}
void
httpFreeContext
(
HttpServer
*
pServer
,
HttpContext
*
pContext
);
...
...
@@ -124,7 +124,7 @@ void httpCleanUpContextTimer(HttpContext *pContext) {
void
httpCleanUpContext
(
HttpContext
*
pContext
)
{
httpTrace
(
"context:%p, start the clean up operation"
,
pContext
);
__sync_val_compare_and_swap_64
(
&
pContext
->
signature
,
pContext
,
0
);
atomic_val_compare_exchange_ptr
(
&
pContext
->
signature
,
pContext
,
0
);
if
(
pContext
->
signature
!=
NULL
)
{
httpTrace
(
"context:%p is freed by another thread."
,
pContext
);
return
;
...
...
@@ -494,7 +494,7 @@ void httpProcessHttpData(void *param) {
}
else
{
if
(
httpReadData
(
pThread
,
pContext
))
{
(
*
(
pThread
->
processData
))(
pContext
);
__sync_fetch_and_add
(
&
pThread
->
pServer
->
requestNum
,
1
);
atomic_fetch_add_32
(
&
pThread
->
pServer
->
requestNum
,
1
);
}
}
}
...
...
src/modules/http/src/httpSystem.c
浏览文件 @
9b754d70
...
...
@@ -148,7 +148,7 @@ void httpCleanUpSystem() {
void
httpGetReqCount
(
int32_t
*
httpReqestNum
)
{
if
(
httpServer
!=
NULL
)
{
*
httpReqestNum
=
__sync_fetch_and_and
(
&
httpServer
->
requestNum
,
0
);
*
httpReqestNum
=
atomic_exchange_32
(
&
httpServer
->
requestNum
,
0
);
}
else
{
*
httpReqestNum
=
0
;
}
...
...
src/os/darwin/inc/os.h
浏览文件 @
9b754d70
...
...
@@ -73,28 +73,71 @@
#define atomic_exchange_64(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_ptr(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
// TODO: update prefix of below macros to 'atomic' as '__' is reserved by compiler
// and GCC suggest new code to use '__atomic' builtins to replace '__sync' builtins.
#define __sync_val_compare_and_swap_64 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_32 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_16 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_8 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_ptr __sync_val_compare_and_swap
#define __sync_add_and_fetch_64 __sync_add_and_fetch
#define __sync_add_and_fetch_32 __sync_add_and_fetch
#define __sync_add_and_fetch_16 __sync_add_and_fetch
#define __sync_add_and_fetch_8 __sync_add_and_fetch
#define __sync_add_and_fetch_ptr __sync_add_and_fetch
#define __sync_sub_and_fetch_64 __sync_sub_and_fetch
#define __sync_sub_and_fetch_32 __sync_sub_and_fetch
#define __sync_sub_and_fetch_16 __sync_sub_and_fetch
#define __sync_sub_and_fetch_8 __sync_sub_and_fetch
#define __sync_sub_and_fetch_ptr __sync_sub_and_fetch
int32_t
__sync_val_load_32
(
int32_t
*
ptr
);
void
__sync_val_restore_32
(
int32_t
*
ptr
,
int32_t
newval
);
#define atomic_val_compare_exchange_8 __sync_val_compare_and_swap
#define atomic_val_compare_exchange_16 __sync_val_compare_and_swap
#define atomic_val_compare_exchange_32 __sync_val_compare_and_swap
#define atomic_val_compare_exchange_64 __sync_val_compare_and_swap
#define atomic_val_compare_exchange_ptr __sync_val_compare_and_swap
#define atomic_add_fetch_8(ptr, val) __atomic_add_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_add_fetch_16(ptr, val) __atomic_add_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_add_fetch_32(ptr, val) __atomic_add_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_add_fetch_64(ptr, val) __atomic_add_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_add_fetch_ptr(ptr, val) __atomic_add_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_add_8(ptr, val) __atomic_fetch_add((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_add_16(ptr, val) __atomic_fetch_add((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_add_32(ptr, val) __atomic_fetch_add((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_add_64(ptr, val) __atomic_fetch_add((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_add_ptr(ptr, val) __atomic_fetch_add((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_sub_fetch_8(ptr, val) __atomic_sub_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_sub_fetch_16(ptr, val) __atomic_sub_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_sub_fetch_32(ptr, val) __atomic_sub_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_sub_fetch_64(ptr, val) __atomic_sub_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_sub_fetch_ptr(ptr, val) __atomic_sub_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_sub_8(ptr, val) __atomic_fetch_sub((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_sub_16(ptr, val) __atomic_fetch_sub((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_sub_32(ptr, val) __atomic_fetch_sub((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_sub_64(ptr, val) __atomic_fetch_sub((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_sub_ptr(ptr, val) __atomic_fetch_sub((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_and_fetch_8(ptr, val) __atomic_and_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_and_fetch_16(ptr, val) __atomic_and_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_and_fetch_32(ptr, val) __atomic_and_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_and_fetch_64(ptr, val) __atomic_and_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_and_fetch_ptr(ptr, val) __atomic_and_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_and_8(ptr, val) __atomic_fetch_and((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_and_16(ptr, val) __atomic_fetch_and((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_and_32(ptr, val) __atomic_fetch_and((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_and_64(ptr, val) __atomic_fetch_and((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_and_ptr(ptr, val) __atomic_fetch_and((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_or_fetch_8(ptr, val) __atomic_or_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_or_fetch_16(ptr, val) __atomic_or_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_or_fetch_32(ptr, val) __atomic_or_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_or_fetch_64(ptr, val) __atomic_or_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_or_fetch_ptr(ptr, val) __atomic_or_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_or_8(ptr, val) __atomic_fetch_or((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_or_16(ptr, val) __atomic_fetch_or((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_or_32(ptr, val) __atomic_fetch_or((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_or_64(ptr, val) __atomic_fetch_or((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_or_ptr(ptr, val) __atomic_fetch_or((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_xor_fetch_8(ptr, val) __atomic_xor_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_xor_fetch_16(ptr, val) __atomic_xor_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_xor_fetch_32(ptr, val) __atomic_xor_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_xor_fetch_64(ptr, val) __atomic_xor_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_xor_fetch_ptr(ptr, val) __atomic_xor_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_xor_8(ptr, val) __atomic_fetch_xor((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_xor_16(ptr, val) __atomic_fetch_xor((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_xor_32(ptr, val) __atomic_fetch_xor((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_xor_64(ptr, val) __atomic_fetch_xor((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_xor_ptr(ptr, val) __atomic_fetch_xor((ptr), (val), __ATOMIC_SEQ_CST)
#define SWAP(a, b, c) \
do { \
...
...
src/os/darwin/src/tdarwin.c
浏览文件 @
9b754d70
...
...
@@ -416,11 +416,3 @@ int tsem_post(dispatch_semaphore_t *sem) {
int
tsem_destroy
(
dispatch_semaphore_t
*
sem
)
{
return
0
;
}
int32_t
__sync_val_load_32
(
int32_t
*
ptr
)
{
return
__atomic_load_n
(
ptr
,
__ATOMIC_ACQUIRE
);
}
void
__sync_val_restore_32
(
int32_t
*
ptr
,
int32_t
newval
)
{
__atomic_store_n
(
ptr
,
newval
,
__ATOMIC_RELEASE
);
}
\ No newline at end of file
src/os/linux/inc/os.h
浏览文件 @
9b754d70
...
...
@@ -90,28 +90,71 @@ extern "C" {
#define atomic_exchange_64(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_ptr(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
// TODO: update prefix of below macros to 'atomic' as '__' is reserved by compiler
// and GCC suggest new code to use '__atomic' builtins to replace '__sync' builtins.
#define __sync_val_compare_and_swap_64 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_32 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_16 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_8 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_ptr __sync_val_compare_and_swap
#define __sync_add_and_fetch_64 __sync_add_and_fetch
#define __sync_add_and_fetch_32 __sync_add_and_fetch
#define __sync_add_and_fetch_16 __sync_add_and_fetch
#define __sync_add_and_fetch_8 __sync_add_and_fetch
#define __sync_add_and_fetch_ptr __sync_add_and_fetch
#define __sync_sub_and_fetch_64 __sync_sub_and_fetch
#define __sync_sub_and_fetch_32 __sync_sub_and_fetch
#define __sync_sub_and_fetch_16 __sync_sub_and_fetch
#define __sync_sub_and_fetch_8 __sync_sub_and_fetch
#define __sync_sub_and_fetch_ptr __sync_sub_and_fetch
int32_t
__sync_val_load_32
(
int32_t
*
ptr
);
void
__sync_val_restore_32
(
int32_t
*
ptr
,
int32_t
newval
);
#define atomic_val_compare_exchange_8 __sync_val_compare_and_swap
#define atomic_val_compare_exchange_16 __sync_val_compare_and_swap
#define atomic_val_compare_exchange_32 __sync_val_compare_and_swap
#define atomic_val_compare_exchange_64 __sync_val_compare_and_swap
#define atomic_val_compare_exchange_ptr __sync_val_compare_and_swap
#define atomic_add_fetch_8(ptr, val) __atomic_add_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_add_fetch_16(ptr, val) __atomic_add_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_add_fetch_32(ptr, val) __atomic_add_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_add_fetch_64(ptr, val) __atomic_add_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_add_fetch_ptr(ptr, val) __atomic_add_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_add_8(ptr, val) __atomic_fetch_add((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_add_16(ptr, val) __atomic_fetch_add((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_add_32(ptr, val) __atomic_fetch_add((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_add_64(ptr, val) __atomic_fetch_add((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_add_ptr(ptr, val) __atomic_fetch_add((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_sub_fetch_8(ptr, val) __atomic_sub_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_sub_fetch_16(ptr, val) __atomic_sub_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_sub_fetch_32(ptr, val) __atomic_sub_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_sub_fetch_64(ptr, val) __atomic_sub_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_sub_fetch_ptr(ptr, val) __atomic_sub_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_sub_8(ptr, val) __atomic_fetch_sub((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_sub_16(ptr, val) __atomic_fetch_sub((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_sub_32(ptr, val) __atomic_fetch_sub((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_sub_64(ptr, val) __atomic_fetch_sub((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_sub_ptr(ptr, val) __atomic_fetch_sub((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_and_fetch_8(ptr, val) __atomic_and_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_and_fetch_16(ptr, val) __atomic_and_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_and_fetch_32(ptr, val) __atomic_and_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_and_fetch_64(ptr, val) __atomic_and_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_and_fetch_ptr(ptr, val) __atomic_and_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_and_8(ptr, val) __atomic_fetch_and((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_and_16(ptr, val) __atomic_fetch_and((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_and_32(ptr, val) __atomic_fetch_and((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_and_64(ptr, val) __atomic_fetch_and((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_and_ptr(ptr, val) __atomic_fetch_and((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_or_fetch_8(ptr, val) __atomic_or_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_or_fetch_16(ptr, val) __atomic_or_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_or_fetch_32(ptr, val) __atomic_or_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_or_fetch_64(ptr, val) __atomic_or_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_or_fetch_ptr(ptr, val) __atomic_or_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_or_8(ptr, val) __atomic_fetch_or((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_or_16(ptr, val) __atomic_fetch_or((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_or_32(ptr, val) __atomic_fetch_or((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_or_64(ptr, val) __atomic_fetch_or((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_or_ptr(ptr, val) __atomic_fetch_or((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_xor_fetch_8(ptr, val) __atomic_xor_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_xor_fetch_16(ptr, val) __atomic_xor_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_xor_fetch_32(ptr, val) __atomic_xor_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_xor_fetch_64(ptr, val) __atomic_xor_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_xor_fetch_ptr(ptr, val) __atomic_xor_fetch((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_xor_8(ptr, val) __atomic_fetch_xor((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_xor_16(ptr, val) __atomic_fetch_xor((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_xor_32(ptr, val) __atomic_fetch_xor((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_xor_64(ptr, val) __atomic_fetch_xor((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_fetch_xor_ptr(ptr, val) __atomic_fetch_xor((ptr), (val), __ATOMIC_SEQ_CST)
#define SWAP(a, b, c) \
do { \
...
...
src/os/linux/src/tlinux.c
浏览文件 @
9b754d70
...
...
@@ -340,11 +340,3 @@ bool taosSkipSocketCheck() {
return
false
;
}
int32_t
__sync_val_load_32
(
int32_t
*
ptr
)
{
return
__atomic_load_n
(
ptr
,
__ATOMIC_ACQUIRE
);
}
void
__sync_val_restore_32
(
int32_t
*
ptr
,
int32_t
newval
)
{
__atomic_store_n
(
ptr
,
newval
,
__ATOMIC_RELEASE
);
}
src/os/windows/inc/os.h
浏览文件 @
9b754d70
...
...
@@ -81,6 +81,10 @@ extern "C" {
#if defined(_M_ARM) || defined(_M_ARM64)
/* the '__iso_volatile' functions does not use a memory fence, so these
* definitions are incorrect, comment out as we don't support Windows on
* ARM at present.
#define atomic_load_8(ptr) __iso_volatile_load8((const volatile __int8*)(ptr))
#define atomic_load_16(ptr) __iso_volatile_load16((const volatile __int16*)(ptr))
#define atomic_load_32(ptr) __iso_volatile_load32((const volatile __int32*)(ptr))
...
...
@@ -98,7 +102,7 @@ extern "C" {
#define atomic_load_ptr atomic_load_32
#define atomic_store_ptr atomic_store_32
#endif
*/
#else
#define atomic_load_8(ptr) (*(char volatile*)(ptr))
...
...
@@ -121,35 +125,152 @@ extern "C" {
#define atomic_exchange_64(ptr, val) _InterlockedExchange64((__int64 volatile*)(ptr), (__int64)(val))
#define atomic_exchange_ptr(ptr, val) _InterlockedExchangePointer((void* volatile*)(ptr), (void*)(val))
#define __sync_val_compare_and_swap_8(ptr, oldval, newval) _InterlockedCompareExchange8((char volatile*)(ptr), (char)(newval), (char)(oldval))
#define __sync_val_compare_and_swap_16(ptr, oldval, newval) _InterlockedCompareExchange16((short volatile*)(ptr), (short)(newval), (short)(oldval))
#define __sync_val_compare_and_swap_32(ptr, oldval, newval) _InterlockedCompareExchange((long volatile*)(ptr), (long)(newval), (long)(oldval))
#define __sync_val_compare_and_swap_64(ptr, oldval, newval) _InterlockedCompareExchange64((__int64 volatile*)(ptr), (__int64)(newval), (__int64)(oldval))
#define __sync_val_compare_and_swap_ptr(ptr, oldval, newval) _InterlockedCompareExchangePointer((void* volatile*)(ptr), (void*)(newval), (void*)(oldval))
char
interlocked_add_8
(
char
volatile
*
ptr
,
char
val
);
short
interlocked_add_16
(
short
volatile
*
ptr
,
short
val
);
long
interlocked_add_32
(
long
volatile
*
ptr
,
long
val
);
__int64
interlocked_add_64
(
__int64
volatile
*
ptr
,
__int64
val
);
#define __sync_add_and_fetch_8(ptr, val) interlocked_add_8((char volatile*)(ptr), (char)(val))
#define __sync_add_and_fetch_16(ptr, val) interlocked_add_16((short volatile*)(ptr), (short)(val))
#define __sync_add_and_fetch_32(ptr, val) interlocked_add_32((long volatile*)(ptr), (long)(val))
#define __sync_add_and_fetch_64(ptr, val) interlocked_add_64((__int64 volatile*)(ptr), (__int64)(val))
#define atomic_val_compare_exchange_8(ptr, oldval, newval) _InterlockedCompareExchange8((char volatile*)(ptr), (char)(newval), (char)(oldval))
#define atomic_val_compare_exchange_16(ptr, oldval, newval) _InterlockedCompareExchange16((short volatile*)(ptr), (short)(newval), (short)(oldval))
#define atomic_val_compare_exchange_32(ptr, oldval, newval) _InterlockedCompareExchange((long volatile*)(ptr), (long)(newval), (long)(oldval))
#define atomic_val_compare_exchange_64(ptr, oldval, newval) _InterlockedCompareExchange64((__int64 volatile*)(ptr), (__int64)(newval), (__int64)(oldval))
#define atomic_val_compare_exchange_ptr(ptr, oldval, newval) _InterlockedCompareExchangePointer((void* volatile*)(ptr), (void*)(newval), (void*)(oldval))
char
interlocked_add_fetch_8
(
char
volatile
*
ptr
,
char
val
);
short
interlocked_add_fetch_16
(
short
volatile
*
ptr
,
short
val
);
long
interlocked_add_fetch_32
(
long
volatile
*
ptr
,
long
val
);
__int64
interlocked_add_fetch_64
(
__int64
volatile
*
ptr
,
__int64
val
);
#define atomic_add_fetch_8(ptr, val) interlocked_add_fetch_8((char volatile*)(ptr), (char)(val))
#define atomic_add_fetch_16(ptr, val) interlocked_add_fetch_16((short volatile*)(ptr), (short)(val))
#define atomic_add_fetch_32(ptr, val) interlocked_add_fetch_32((long volatile*)(ptr), (long)(val))
#define atomic_add_fetch_64(ptr, val) interlocked_add_fetch_64((__int64 volatile*)(ptr), (__int64)(val))
#ifdef _WIN64
#define atomic_add_fetch_ptr atomic_add_fetch_64
#else
#define atomic_add_fetch_ptr atomic_add_fetch_32
#endif
#define atomic_fetch_add_8(ptr, val) _InterlockedExchangeAdd8((char volatile*)(ptr), (char)(val))
#define atomic_fetch_add_16(ptr, val) _InterlockedExchangeAdd16((short volatile*)(ptr), (short)(val))
#define atomic_fetch_add_32(ptr, val) _InterlockedExchangeAdd((long volatile*)(ptr), (long)(val))
#define atomic_fetch_add_64(ptr, val) _InterlockedExchangeAdd64((__int64 volatile*)(ptr), (__int64)(val))
#ifdef _WIN64
#define atomic_fetch_add_ptr atomic_fetch_add_64
#else
#define atomic_fetch_add_ptr atomic_fetch_add_32
#endif
#define atomic_sub_fetch_8(ptr, val) interlocked_add_fetch_8((char volatile*)(ptr), -(char)(val))
#define atomic_sub_fetch_16(ptr, val) interlocked_add_fetch_16((short volatile*)(ptr), -(short)(val))
#define atomic_sub_fetch_32(ptr, val) interlocked_add_fetch_32((long volatile*)(ptr), -(long)(val))
#define atomic_sub_fetch_64(ptr, val) interlocked_add_fetch_64((__int64 volatile*)(ptr), -(__int64)(val))
#ifdef _WIN64
#define atomic_sub_fetch_ptr atomic_sub_fetch_64
#else
#define atomic_sub_fetch_ptr atomic_sub_fetch_32
#endif
#define atomic_fetch_sub_8(ptr, val) _InterlockedExchangeAdd8((char volatile*)(ptr), -(char)(val))
#define atomic_fetch_sub_16(ptr, val) _InterlockedExchangeAdd16((short volatile*)(ptr), -(short)(val))
#define atomic_fetch_sub_32(ptr, val) _InterlockedExchangeAdd((long volatile*)(ptr), -(long)(val))
#define atomic_fetch_sub_64(ptr, val) _InterlockedExchangeAdd64((__int64 volatile*)(ptr), -(__int64)(val))
#ifdef _WIN64
#define atomic_fetch_sub_ptr atomic_fetch_sub_64
#else
#define atomic_fetch_sub_ptr atomic_fetch_sub_32
#endif
char
interlocked_and_fetch_8
(
char
volatile
*
ptr
,
char
val
);
short
interlocked_and_fetch_16
(
short
volatile
*
ptr
,
short
val
);
long
interlocked_and_fetch_32
(
long
volatile
*
ptr
,
long
val
);
__int64
interlocked_and_fetch_64
(
__int64
volatile
*
ptr
,
__int64
val
);
#define atomic_and_fetch_8(ptr, val) interlocked_and_fetch_8((char volatile*)(ptr), (char)(val))
#define atomic_and_fetch_16(ptr, val) interlocked_and_fetch_16((short volatile*)(ptr), (short)(val))
#define atomic_and_fetch_32(ptr, val) interlocked_and_fetch_32((long volatile*)(ptr), (long)(val))
#define atomic_and_fetch_64(ptr, val) interlocked_and_fetch_64((__int64 volatile*)(ptr), (__int64)(val))
#ifdef _WIN64
#define atomic_and_fetch_ptr atomic_and_fetch_64
#else
#define atomic_and_fetch_ptr atomic_and_fetch_32
#endif
#define atomic_fetch_and_8(ptr, val) _InterlockedAnd8((char volatile*)(ptr), (char)(val))
#define atomic_fetch_and_16(ptr, val) _InterlockedAnd16((short volatile*)(ptr), (short)(val))
#define atomic_fetch_and_32(ptr, val) _InterlockedAnd((long volatile*)(ptr), (long)(val))
#ifdef _M_IX86
__int64
interlocked_fetch_and_64
(
__int64
volatile
*
ptr
,
__int64
val
);
#define atomic_fetch_and_64(ptr, val) interlocked_fetch_and_64((__int64 volatile*)(ptr), (__int64)(val))
#else
#define atomic_fetch_and_64(ptr, val) _InterlockedAnd64((__int64 volatile*)(ptr), (__int64)(val))
#endif
#ifdef _WIN64
#define atomic_fetch_and_ptr atomic_fetch_and_64
#else
#define atomic_fetch_and_ptr atomic_fetch_and_32
#endif
char
interlocked_or_fetch_8
(
char
volatile
*
ptr
,
char
val
);
short
interlocked_or_fetch_16
(
short
volatile
*
ptr
,
short
val
);
long
interlocked_or_fetch_32
(
long
volatile
*
ptr
,
long
val
);
__int64
interlocked_or_fetch_64
(
__int64
volatile
*
ptr
,
__int64
val
);
#define atomic_or_fetch_8(ptr, val) interlocked_or_fetch_8((char volatile*)(ptr), (char)(val))
#define atomic_or_fetch_16(ptr, val) interlocked_or_fetch_16((short volatile*)(ptr), (short)(val))
#define atomic_or_fetch_32(ptr, val) interlocked_or_fetch_32((long volatile*)(ptr), (long)(val))
#define atomic_or_fetch_64(ptr, val) interlocked_or_fetch_64((__int64 volatile*)(ptr), (__int64)(val))
#ifdef _WIN64
#define atomic_or_fetch_ptr atomic_or_fetch_64
#else
#define atomic_or_fetch_ptr atomic_or_fetch_32
#endif
#define atomic_fetch_or_8(ptr, val) _InterlockedOr8((char volatile*)(ptr), (char)(val))
#define atomic_fetch_or_16(ptr, val) _InterlockedOr16((short volatile*)(ptr), (short)(val))
#define atomic_fetch_or_32(ptr, val) _InterlockedOr((long volatile*)(ptr), (long)(val))
#ifdef _M_IX86
__int64
interlocked_fetch_or_64
(
__int64
volatile
*
ptr
,
__int64
val
);
#define atomic_fetch_or_64(ptr, val) interlocked_fetch_or_64((__int64 volatile*)(ptr), (__int64)(val))
#else
#define atomic_fetch_or_64(ptr, val) _InterlockedOr64((__int64 volatile*)(ptr), (__int64)(val))
#endif
#ifdef _WIN64
#define atomic_fetch_or_ptr atomic_fetch_or_64
#else
#define atomic_fetch_or_ptr atomic_fetch_or_32
#endif
char
interlocked_xor_fetch_8
(
char
volatile
*
ptr
,
char
val
);
short
interlocked_xor_fetch_16
(
short
volatile
*
ptr
,
short
val
);
long
interlocked_xor_fetch_32
(
long
volatile
*
ptr
,
long
val
);
__int64
interlocked_xor_fetch_64
(
__int64
volatile
*
ptr
,
__int64
val
);
#define atomic_xor_fetch_8(ptr, val) interlocked_xor_fetch_8((char volatile*)(ptr), (char)(val))
#define atomic_xor_fetch_16(ptr, val) interlocked_xor_fetch_16((short volatile*)(ptr), (short)(val))
#define atomic_xor_fetch_32(ptr, val) interlocked_xor_fetch_32((long volatile*)(ptr), (long)(val))
#define atomic_xor_fetch_64(ptr, val) interlocked_xor_fetch_64((__int64 volatile*)(ptr), (__int64)(val))
#ifdef _WIN64
#define
__sync_add_and_fetch_ptr __sync_add_and
_fetch_64
#define
atomic_xor_fetch_ptr atomic_xor
_fetch_64
#else
#define
__sync_add_and_fetch_ptr __sync_add_and
_fetch_32
#define
atomic_xor_fetch_ptr atomic_xor
_fetch_32
#endif
#define __sync_sub_and_fetch_8(ptr, val) __sync_add_and_fetch_8((ptr), -(val))
#define __sync_sub_and_fetch_16(ptr, val) __sync_add_and_fetch_16((ptr), -(val))
#define __sync_sub_and_fetch_32(ptr, val) __sync_add_and_fetch_32((ptr), -(val))
#define __sync_sub_and_fetch_64(ptr, val) __sync_add_and_fetch_64((ptr), -(val))
#define __sync_sub_and_fetch_ptr(ptr, val) __sync_add_and_fetch_ptr((ptr), -(val))
#define atomic_fetch_xor_8(ptr, val) _InterlockedXor8((char volatile*)(ptr), (char)(val))
#define atomic_fetch_xor_16(ptr, val) _InterlockedXor16((short volatile*)(ptr), (short)(val))
#define atomic_fetch_xor_32(ptr, val) _InterlockedXor((long volatile*)(ptr), (long)(val))
int32_t
__sync_val_load_32
(
int32_t
*
ptr
);
void
__sync_val_restore_32
(
int32_t
*
ptr
,
int32_t
newval
);
#ifdef _M_IX86
__int64
interlocked_fetch_xor_64
(
__int64
volatile
*
ptr
,
__int64
val
);
#define atomic_fetch_xor_64(ptr, val) interlocked_fetch_xor_64((__int64 volatile*)(ptr), (__int64)(val))
#else
#define atomic_fetch_xor_64(ptr, val) _InterlockedXor64((__int64 volatile*)(ptr), (__int64)(val))
#endif
#ifdef _WIN64
#define atomic_fetch_xor_ptr atomic_fetch_xor_64
#else
#define atomic_fetch_xor_ptr atomic_fetch_xor_32
#endif
#define SWAP(a, b, c) \
do { \
...
...
src/os/windows/src/twindows.c
浏览文件 @
9b754d70
...
...
@@ -66,31 +66,143 @@ int taosSetSockOpt(int socketfd, int level, int optname, void *optval, int optle
return
setsockopt
(
socketfd
,
level
,
optname
,
optval
,
optlen
);
}
char
interlocked_add_8
(
char
volatile
*
ptr
,
char
val
)
{
// add
char
interlocked_add_
fetch_
8
(
char
volatile
*
ptr
,
char
val
)
{
return
_InterlockedExchangeAdd8
(
ptr
,
val
)
+
val
;
}
short
interlocked_add_16
(
short
volatile
*
ptr
,
short
val
)
{
short
interlocked_add_
fetch_
16
(
short
volatile
*
ptr
,
short
val
)
{
return
_InterlockedExchangeAdd16
(
ptr
,
val
)
+
val
;
}
long
interlocked_add_32
(
long
volatile
*
ptr
,
long
val
)
{
long
interlocked_add_
fetch_
32
(
long
volatile
*
ptr
,
long
val
)
{
return
_InterlockedExchangeAdd
(
ptr
,
val
)
+
val
;
}
__int64
interlocked_add_64
(
__int64
volatile
*
ptr
,
__int64
val
)
{
__int64
interlocked_add_
fetch_
64
(
__int64
volatile
*
ptr
,
__int64
val
)
{
return
_InterlockedExchangeAdd64
(
ptr
,
val
)
+
val
;
}
int32_t
__sync_val_load_32
(
int32_t
*
ptr
)
{
return
InterlockedOr
(
ptr
,
0
);
// and
char
interlocked_and_fetch_8
(
char
volatile
*
ptr
,
char
val
)
{
return
_InterlockedAnd8
(
ptr
,
val
)
&
val
;
}
short
interlocked_and_fetch_16
(
short
volatile
*
ptr
,
short
val
)
{
return
_InterlockedAnd16
(
ptr
,
val
)
&
val
;
}
long
interlocked_and_fetch_32
(
long
volatile
*
ptr
,
long
val
)
{
return
_InterlockedAnd
(
ptr
,
val
)
&
val
;
}
#ifndef _M_IX86
__int64
interlocked_and_fetch_64
(
__int64
volatile
*
ptr
,
__int64
val
)
{
return
_InterlockedAnd64
(
ptr
,
val
)
&
val
;
}
#else
__int64
interlocked_and_fetch_64
(
__int64
volatile
*
ptr
,
__int64
val
)
{
__int64
old
,
res
;
do
{
old
=
*
ptr
;
res
=
old
&
val
;
}
while
(
_InterlockedCompareExchange64
(
ptr
,
res
,
old
)
!=
old
);
return
res
;
}
__int64
interlocked_fetch_and_64
(
__int64
volatile
*
ptr
,
__int64
val
)
{
__int64
old
;
do
{
old
=
*
ptr
;
}
while
(
_InterlockedCompareExchange64
(
ptr
,
old
&
val
,
old
)
!=
old
);
return
old
;
}
#endif
// or
char
interlocked_or_fetch_8
(
char
volatile
*
ptr
,
char
val
)
{
return
_InterlockedOr8
(
ptr
,
val
)
|
val
;
}
short
interlocked_or_fetch_16
(
short
volatile
*
ptr
,
short
val
)
{
return
_InterlockedOr16
(
ptr
,
val
)
|
val
;
}
long
interlocked_or_fetch_32
(
long
volatile
*
ptr
,
long
val
)
{
return
_InterlockedOr
(
ptr
,
val
)
|
val
;
}
#ifndef _M_IX86
__int64
interlocked_or_fetch_64
(
__int64
volatile
*
ptr
,
__int64
val
)
{
return
_InterlockedOr64
(
ptr
,
val
)
&
val
;
}
#else
__int64
interlocked_or_fetch_64
(
__int64
volatile
*
ptr
,
__int64
val
)
{
__int64
old
,
res
;
do
{
old
=
*
ptr
;
res
=
old
|
val
;
}
while
(
_InterlockedCompareExchange64
(
ptr
,
res
,
old
)
!=
old
);
return
res
;
}
__int64
interlocked_fetch_or_64
(
__int64
volatile
*
ptr
,
__int64
val
)
{
__int64
old
;
do
{
old
=
*
ptr
;
}
while
(
_InterlockedCompareExchange64
(
ptr
,
old
|
val
,
old
)
!=
old
);
return
old
;
}
#endif
// xor
char
interlocked_xor_fetch_8
(
char
volatile
*
ptr
,
char
val
)
{
return
_InterlockedXor8
(
ptr
,
val
)
^
val
;
}
short
interlocked_xor_fetch_16
(
short
volatile
*
ptr
,
short
val
)
{
return
_InterlockedXor16
(
ptr
,
val
)
^
val
;
}
long
interlocked_xor_fetch_32
(
long
volatile
*
ptr
,
long
val
)
{
return
_InterlockedXor
(
ptr
,
val
)
^
val
;
}
#ifndef _M_IX86
__int64
interlocked_xor_fetch_64
(
__int64
volatile
*
ptr
,
__int64
val
)
{
return
_InterlockedXor64
(
ptr
,
val
)
^
val
;
}
void
__sync_val_restore_32
(
int32_t
*
ptr
,
int32_t
newval
)
{
InterlockedCompareExchange
(
ptr
,
*
ptr
,
newval
);
#else
__int64
interlocked_xor_fetch_64
(
__int64
volatile
*
ptr
,
__int64
val
)
{
__int64
old
,
res
;
do
{
old
=
*
ptr
;
res
=
old
^
val
;
}
while
(
_InterlockedCompareExchange64
(
ptr
,
res
,
old
)
!=
old
);
return
res
;
}
__int64
interlocked_fetch_xor_64
(
__int64
volatile
*
ptr
,
__int64
val
)
{
__int64
old
;
do
{
old
=
*
ptr
;
}
while
(
_InterlockedCompareExchange64
(
ptr
,
old
^
val
,
old
)
!=
old
);
return
old
;
}
#endif
void
tsPrintOsInfo
()
{}
char
*
taosCharsetReplace
(
char
*
charsetstr
)
{
...
...
src/rpc/src/trpc.c
浏览文件 @
9b754d70
...
...
@@ -164,8 +164,8 @@ char *taosBuildReqHeader(void *param, char type, char *msg) {
pHeader
->
spi
=
0
;
pHeader
->
tcp
=
0
;
pHeader
->
encrypt
=
0
;
pHeader
->
tranId
=
__sync_add_an
d_fetch_32
(
&
pConn
->
tranId
,
1
);
if
(
pHeader
->
tranId
==
0
)
pHeader
->
tranId
=
__sync_add_an
d_fetch_32
(
&
pConn
->
tranId
,
1
);
pHeader
->
tranId
=
atomic_ad
d_fetch_32
(
&
pConn
->
tranId
,
1
);
if
(
pHeader
->
tranId
==
0
)
pHeader
->
tranId
=
atomic_ad
d_fetch_32
(
&
pConn
->
tranId
,
1
);
pHeader
->
sourceId
=
pConn
->
ownId
;
pHeader
->
destId
=
pConn
->
peerId
;
...
...
@@ -196,8 +196,8 @@ char *taosBuildReqMsgWithSize(void *param, char type, int size) {
pHeader
->
spi
=
0
;
pHeader
->
tcp
=
0
;
pHeader
->
encrypt
=
0
;
pHeader
->
tranId
=
__sync_add_an
d_fetch_32
(
&
pConn
->
tranId
,
1
);
if
(
pHeader
->
tranId
==
0
)
pHeader
->
tranId
=
__sync_add_an
d_fetch_32
(
&
pConn
->
tranId
,
1
);
pHeader
->
tranId
=
atomic_ad
d_fetch_32
(
&
pConn
->
tranId
,
1
);
if
(
pHeader
->
tranId
==
0
)
pHeader
->
tranId
=
atomic_ad
d_fetch_32
(
&
pConn
->
tranId
,
1
);
pHeader
->
sourceId
=
pConn
->
ownId
;
pHeader
->
destId
=
pConn
->
peerId
;
...
...
src/system/detail/inc/vnode.h
浏览文件 @
9b754d70
...
...
@@ -218,15 +218,15 @@ typedef struct {
* Only the QInfo.signature == QInfo, this structure can be released safely.
*/
#define TSDB_QINFO_QUERY_FLAG 0x1
#define TSDB_QINFO_RESET_SIG(x)
((x)->signature =
(uint64_t)(x))
#define TSDB_QINFO_RESET_SIG(x)
atomic_store_64(&((x)->signature),
(uint64_t)(x))
#define TSDB_QINFO_SET_QUERY_FLAG(x) \
__sync_val_compare_and_swap
(&((x)->signature), (uint64_t)(x), TSDB_QINFO_QUERY_FLAG);
atomic_val_compare_exchange_64
(&((x)->signature), (uint64_t)(x), TSDB_QINFO_QUERY_FLAG);
// live lock: wait for query reaching a safe-point, release all resources
// belongs to this query
#define TSDB_WAIT_TO_SAFE_DROP_QINFO(x) \
{ \
while (
__sync_val_compare_and_swap
(&((x)->signature), (x), 0) == TSDB_QINFO_QUERY_FLAG) { \
while (
atomic_val_compare_exchange_64
(&((x)->signature), (x), 0) == TSDB_QINFO_QUERY_FLAG) { \
taosMsleep(1); \
} \
}
...
...
src/system/detail/src/dnodeSystem.c
浏览文件 @
9b754d70
...
...
@@ -216,8 +216,8 @@ void dnodeResetSystem() {
void
dnodeCountRequest
(
SCountInfo
*
info
)
{
httpGetReqCount
(
&
info
->
httpReqNum
);
info
->
selectReqNum
=
__sync_fetch_and_and
(
&
vnodeSelectReqNum
,
0
);
info
->
insertReqNum
=
__sync_fetch_and_and
(
&
vnodeInsertReqNum
,
0
);
info
->
selectReqNum
=
atomic_exchange_32
(
&
vnodeSelectReqNum
,
0
);
info
->
insertReqNum
=
atomic_exchange_32
(
&
vnodeInsertReqNum
,
0
);
}
#pragma GCC diagnostic pop
\ No newline at end of file
src/system/detail/src/mgmtShell.c
浏览文件 @
9b754d70
...
...
@@ -922,8 +922,8 @@ int mgmtProcessRetrieveMsg(char *pMsg, int msgLen, SConnObj *pConn) {
taosSendMsgToPeer
(
pConn
->
thandle
,
pStart
,
msgLen
);
if
(
rowsToRead
==
0
)
{
int64_t
oldSign
=
__sync_val_compare_and_swap
(
&
pShow
->
signature
,
(
uint64_t
)
pShow
,
0
);
if
(
oldSign
!=
(
uint
64
_t
)
pShow
)
{
uintptr_t
oldSign
=
atomic_val_compare_exchange_ptr
(
&
pShow
->
signature
,
pShow
,
0
);
if
(
oldSign
!=
(
uint
ptr
_t
)
pShow
)
{
return
msgLen
;
}
// pShow->signature = 0;
...
...
@@ -1093,8 +1093,8 @@ int mgmtProcessHeartBeatMsg(char *cont, int contLen, SConnObj *pConn) {
}
void
mgmtEstablishConn
(
SConnObj
*
pConn
)
{
__sync_fetch_and_add
(
&
mgmtShellConns
,
1
);
__sync_fetch_and_add
(
&
sdbExtConns
,
1
);
atomic_fetch_add_32
(
&
mgmtShellConns
,
1
);
atomic_fetch_add_32
(
&
sdbExtConns
,
1
);
pConn
->
stime
=
taosGetTimestampMs
();
if
(
strcmp
(
pConn
->
pUser
->
user
,
"root"
)
==
0
||
strcmp
(
pConn
->
pUser
->
user
,
pConn
->
pAcct
->
user
)
==
0
)
{
...
...
@@ -1168,8 +1168,8 @@ int mgmtProcessConnectMsg(char *pMsg, int msgLen, SConnObj *pConn) {
if
(
pConn
->
pAcct
)
{
mgmtRemoveConnFromAcct
(
pConn
);
__sync_fetch_and_sub
(
&
mgmtShellConns
,
1
);
__sync_fetch_and_sub
(
&
sdbExtConns
,
1
);
atomic_fetch_sub_32
(
&
mgmtShellConns
,
1
);
atomic_fetch_sub_32
(
&
sdbExtConns
,
1
);
}
code
=
0
;
...
...
@@ -1227,8 +1227,8 @@ void *mgmtProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
if
(
msg
==
NULL
)
{
if
(
pConn
)
{
mgmtRemoveConnFromAcct
(
pConn
);
__sync_fetch_and_sub
(
&
mgmtShellConns
,
1
);
__sync_fetch_and_sub
(
&
sdbExtConns
,
1
);
atomic_fetch_sub_32
(
&
mgmtShellConns
,
1
);
atomic_fetch_sub_32
(
&
sdbExtConns
,
1
);
mTrace
(
"connection from %s is closed"
,
pConn
->
pUser
->
user
);
memset
(
pConn
,
0
,
sizeof
(
SConnObj
));
}
...
...
src/system/detail/src/vnodeCache.c
浏览文件 @
9b754d70
...
...
@@ -256,7 +256,7 @@ void vnodeUpdateCommitInfo(SMeterObj *pObj, int slot, int pos, uint64_t count) {
tslot
=
(
tslot
+
1
)
%
pInfo
->
maxBlocks
;
}
__sync_fetch_and_add
(
&
pObj
->
freePoints
,
pObj
->
pointsPerBlock
*
slots
);
atomic_fetch_add_32
(
&
pObj
->
freePoints
,
pObj
->
pointsPerBlock
*
slots
);
pInfo
->
commitSlot
=
slot
;
pInfo
->
commitPoint
=
pos
;
pObj
->
commitCount
=
count
;
...
...
@@ -505,7 +505,7 @@ int vnodeInsertPointToCache(SMeterObj *pObj, char *pData) {
pData
+=
pObj
->
schema
[
col
].
bytes
;
}
__sync_fetch_and_sub
(
&
pObj
->
freePoints
,
1
);
atomic_fetch_sub_32
(
&
pObj
->
freePoints
,
1
);
pCacheBlock
->
numOfPoints
++
;
pPool
->
count
++
;
...
...
@@ -1114,7 +1114,7 @@ int vnodeSyncRestoreCache(int vnode, int fd) {
for
(
int
col
=
0
;
col
<
pObj
->
numOfColumns
;
++
col
)
if
(
taosReadMsg
(
fd
,
pBlock
->
offset
[
col
],
pObj
->
schema
[
col
].
bytes
*
points
)
<=
0
)
return
-
1
;
__sync_fetch_and_sub
(
&
pObj
->
freePoints
,
points
);
atomic_fetch_sub_32
(
&
pObj
->
freePoints
,
points
);
blocksReceived
++
;
pointsReceived
+=
points
;
pObj
->
lastKey
=
*
((
TSKEY
*
)(
pBlock
->
offset
[
0
]
+
pObj
->
schema
[
0
].
bytes
*
(
points
-
1
)));
...
...
src/system/detail/src/vnodeFile.c
浏览文件 @
9b754d70
...
...
@@ -410,7 +410,7 @@ void vnodeRemoveFile(int vnode, int fileId) {
int
fd
=
open
(
headName
,
O_RDWR
|
O_CREAT
,
S_IRWXU
|
S_IRWXG
|
S_IRWXO
);
if
(
fd
>
0
)
{
vnodeGetHeadFileHeaderInfo
(
fd
,
&
headInfo
);
__sync_fetch_and_add
(
&
(
pVnode
->
vnodeStatistic
.
totalStorage
),
-
headInfo
.
totalStorage
);
atomic_fetch_add_64
(
&
(
pVnode
->
vnodeStatistic
.
totalStorage
),
-
headInfo
.
totalStorage
);
close
(
fd
);
}
...
...
src/system/detail/src/vnodeImport.c
浏览文件 @
9b754d70
...
...
@@ -497,7 +497,7 @@ int vnodeImportToFile(SImportInfo *pImport) {
pInfo
->
commitPoint
=
0
;
pCacheBlock
->
numOfPoints
=
points
;
if
(
slot
==
pInfo
->
currentSlot
)
{
__sync_fetch_and_add
(
&
pObj
->
freePoints
,
pInfo
->
commitPoint
);
atomic_fetch_add_32
(
&
pObj
->
freePoints
,
pInfo
->
commitPoint
);
}
}
else
{
// if last block is full and committed
...
...
@@ -625,7 +625,7 @@ int vnodeImportToCache(SImportInfo *pImport, char *payload, int rows) {
}
code
=
0
;
__sync_fetch_and_sub
(
&
pObj
->
freePoints
,
rows
);
atomic_fetch_sub_32
(
&
pObj
->
freePoints
,
rows
);
dTrace
(
"vid:%d sid:%d id:%s, %d rows data are imported to cache"
,
pObj
->
vnode
,
pObj
->
sid
,
pObj
->
meterId
,
rows
);
_exit:
...
...
src/system/detail/src/vnodeMeter.c
浏览文件 @
9b754d70
...
...
@@ -643,8 +643,8 @@ int vnodeInsertPoints(SMeterObj *pObj, char *cont, int contLen, char source, voi
pData
+=
pObj
->
bytesPerPoint
;
points
++
;
}
__sync_fetch_and_add
(
&
(
pVnode
->
vnodeStatistic
.
pointsWritten
),
points
*
(
pObj
->
numOfColumns
-
1
));
__sync_fetch_and_add
(
&
(
pVnode
->
vnodeStatistic
.
totalStorage
),
points
*
pObj
->
bytesPerPoint
);
atomic_fetch_add_64
(
&
(
pVnode
->
vnodeStatistic
.
pointsWritten
),
points
*
(
pObj
->
numOfColumns
-
1
));
atomic_fetch_add_64
(
&
(
pVnode
->
vnodeStatistic
.
totalStorage
),
points
*
pObj
->
bytesPerPoint
);
pthread_mutex_lock
(
&
(
pVnode
->
vmutex
));
...
...
src/system/detail/src/vnodeQueryImpl.c
浏览文件 @
9b754d70
...
...
@@ -3889,14 +3889,14 @@ void vnodeDecMeterRefcnt(SQInfo *pQInfo) {
SMeterQuerySupportObj
*
pSupporter
=
pQInfo
->
pMeterQuerySupporter
;
if
(
pSupporter
==
NULL
||
pSupporter
->
numOfMeters
==
1
)
{
__sync_fetch_and_sub
(
&
pQInfo
->
pObj
->
numOfQueries
,
1
);
atomic_fetch_sub_32
(
&
pQInfo
->
pObj
->
numOfQueries
,
1
);
dTrace
(
"QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d"
,
pQInfo
,
pQInfo
->
pObj
->
vnode
,
pQInfo
->
pObj
->
sid
,
pQInfo
->
pObj
->
meterId
,
pQInfo
->
pObj
->
numOfQueries
);
}
else
{
int32_t
num
=
0
;
for
(
int32_t
i
=
0
;
i
<
pSupporter
->
numOfMeters
;
++
i
)
{
SMeterObj
*
pMeter
=
getMeterObj
(
pSupporter
->
pMeterObj
,
pSupporter
->
pSidSet
->
pSids
[
i
]
->
sid
);
__sync_fetch_and_sub
(
&
(
pMeter
->
numOfQueries
),
1
);
atomic_fetch_sub_32
(
&
(
pMeter
->
numOfQueries
),
1
);
if
(
pMeter
->
numOfQueries
>
0
)
{
dTrace
(
"QInfo:%p vid:%d sid:%d meterId:%s, query is over, numOfQueries:%d"
,
pQInfo
,
pMeter
->
vnode
,
pMeter
->
sid
,
...
...
src/system/detail/src/vnodeShell.c
浏览文件 @
9b754d70
...
...
@@ -365,7 +365,7 @@ _query_over:
vnodeFreeColumnInfo
(
&
pQueryMsg
->
colList
[
i
]);
}
__sync_fetch_and_add
(
&
vnodeSelectReqNum
,
1
);
atomic_fetch_add_32
(
&
vnodeSelectReqNum
,
1
);
return
ret
;
}
...
...
@@ -592,6 +592,6 @@ _submit_over:
// for import, send the submit response only when return code is not zero
if
(
pSubmit
->
import
==
0
||
code
!=
0
)
ret
=
vnodeSendShellSubmitRspMsg
(
pObj
,
code
,
numOfTotalPoints
);
__sync_fetch_and_add
(
&
vnodeInsertReqNum
,
1
);
atomic_fetch_add_32
(
&
vnodeInsertReqNum
,
1
);
return
ret
;
}
src/system/detail/src/vnodeStore.c
浏览文件 @
9b754d70
...
...
@@ -351,7 +351,7 @@ void vnodeCalcOpenVnodes() {
openVnodes
++
;
}
__sync_val_compare_and_swap
(
&
tsOpenVnodes
,
tsOpenVnodes
,
openVnodes
);
atomic_store_32
(
&
tsOpenVnodes
,
openVnodes
);
}
void
vnodeUpdateHeadFile
(
int
vnode
,
int
oldTables
,
int
newTables
)
{
...
...
src/system/detail/src/vnodeUtil.c
浏览文件 @
9b754d70
...
...
@@ -567,7 +567,7 @@ int32_t vnodeIncQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterSidExtInfo** pSid
* check if the numOfQueries is 0 or not.
*/
pMeterObjList
[(
*
numOfInc
)
++
]
=
pMeter
;
__sync_fetch_and_add
(
&
pMeter
->
numOfQueries
,
1
);
atomic_fetch_add_32
(
&
pMeter
->
numOfQueries
,
1
);
// output for meter more than one query executed
if
(
pMeter
->
numOfQueries
>
1
)
{
...
...
@@ -591,7 +591,7 @@ void vnodeDecQueryRefCount(SQueryMeterMsg* pQueryMsg, SMeterObj** pMeterObjList,
SMeterObj
*
pMeter
=
pMeterObjList
[
i
];
if
(
pMeter
!=
NULL
)
{
// here, do not need to lock to perform operations
__sync_fetch_and_sub
(
&
pMeter
->
numOfQueries
,
1
);
atomic_fetch_sub_32
(
&
pMeter
->
numOfQueries
,
1
);
if
(
pMeter
->
numOfQueries
>
0
)
{
dTrace
(
"qmsg:%p, vid:%d sid:%d id:%s dec query ref, numOfQueries:%d"
,
pQueryMsg
,
pMeter
->
vnode
,
pMeter
->
sid
,
...
...
@@ -646,7 +646,7 @@ void vnodeUpdateQueryColumnIndex(SQuery* pQuery, SMeterObj* pMeterObj) {
}
int32_t
vnodeSetMeterState
(
SMeterObj
*
pMeterObj
,
int32_t
state
)
{
return
__sync_val_compare_and_swap
(
&
pMeterObj
->
state
,
TSDB_METER_STATE_READY
,
state
);
return
atomic_val_compare_exchange_32
(
&
pMeterObj
->
state
,
TSDB_METER_STATE_READY
,
state
);
}
void
vnodeClearMeterState
(
SMeterObj
*
pMeterObj
,
int32_t
state
)
{
...
...
src/util/src/tcache.c
浏览文件 @
9b754d70
...
...
@@ -516,7 +516,7 @@ static SDataNode *taosUpdateCacheImpl(SCacheObj *pObj, SDataNode *pNode, char *k
pNewNode
->
addTime
=
taosGetTimestampMs
();
pNewNode
->
time
=
pNewNode
->
addTime
+
keepTime
;
__sync_add_an
d_fetch_32
(
&
pNewNode
->
refCount
,
1
);
atomic_ad
d_fetch_32
(
&
pNewNode
->
refCount
,
1
);
// the address of this node may be changed, so the prev and next element should update the corresponding pointer
taosUpdateInHashTable
(
pObj
,
pNewNode
);
...
...
@@ -529,7 +529,7 @@ static SDataNode *taosUpdateCacheImpl(SCacheObj *pObj, SDataNode *pNode, char *k
return
NULL
;
}
__sync_add_an
d_fetch_32
(
&
pNewNode
->
refCount
,
1
);
atomic_ad
d_fetch_32
(
&
pNewNode
->
refCount
,
1
);
assert
(
hashVal
==
(
*
pObj
->
hashFp
)(
key
,
keyLen
-
1
));
pNewNode
->
hashVal
=
hashVal
;
...
...
@@ -558,7 +558,7 @@ static FORCE_INLINE SDataNode *taosAddToCacheImpl(SCacheObj *pObj, char *key, ui
return
NULL
;
}
__sync_add_an
d_fetch_32
(
&
pNode
->
refCount
,
1
);
atomic_ad
d_fetch_32
(
&
pNode
->
refCount
,
1
);
pNode
->
hashVal
=
(
*
pObj
->
hashFp
)(
key
,
keyLen
-
1
);
taosAddNodeToHashTable
(
pObj
,
pNode
);
...
...
@@ -616,7 +616,7 @@ static FORCE_INLINE void taosDecRef(SDataNode *pNode) {
}
if
(
pNode
->
refCount
>
0
)
{
__sync_sub_and
_fetch_32
(
&
pNode
->
refCount
,
1
);
atomic_sub
_fetch_32
(
&
pNode
->
refCount
,
1
);
pTrace
(
"key:%s is released by app.refcnt:%d"
,
pNode
->
key
,
pNode
->
refCount
);
}
else
{
/*
...
...
@@ -676,20 +676,20 @@ void *taosGetDataFromCache(void *handle, char *key) {
SDataNode
*
ptNode
=
taosGetNodeFromHashTable
(
handle
,
key
,
keyLen
);
if
(
ptNode
!=
NULL
)
{
__sync_add_an
d_fetch_32
(
&
ptNode
->
refCount
,
1
);
atomic_ad
d_fetch_32
(
&
ptNode
->
refCount
,
1
);
}
__cache_unlock
(
pObj
);
if
(
ptNode
!=
NULL
)
{
__sync_add_an
d_fetch_32
(
&
pObj
->
statistics
.
hitCount
,
1
);
atomic_ad
d_fetch_32
(
&
pObj
->
statistics
.
hitCount
,
1
);
pTrace
(
"key:%s is retrieved from cache,refcnt:%d"
,
key
,
ptNode
->
refCount
);
}
else
{
__sync_add_an
d_fetch_32
(
&
pObj
->
statistics
.
missCount
,
1
);
atomic_ad
d_fetch_32
(
&
pObj
->
statistics
.
missCount
,
1
);
pTrace
(
"key:%s not in cache,retrieved failed"
,
key
);
}
__sync_add_an
d_fetch_32
(
&
pObj
->
statistics
.
totalAccess
,
1
);
atomic_ad
d_fetch_32
(
&
pObj
->
statistics
.
totalAccess
,
1
);
return
(
ptNode
!=
NULL
)
?
ptNode
->
data
:
NULL
;
}
...
...
src/util/src/textbuffer.c
浏览文件 @
9b754d70
...
...
@@ -60,7 +60,7 @@ void getTmpfilePath(const char *fileNamePrefix, char *dstPath) {
strcat
(
tmpPath
,
fileNamePrefix
);
strcat
(
tmpPath
,
"-%u-%u"
);
snprintf
(
dstPath
,
MAX_TMPFILE_PATH_LENGTH
,
tmpPath
,
taosGetPthreadId
(),
__sync_add_an
d_fetch_32
(
&
tmpFileSerialNum
,
1
));
snprintf
(
dstPath
,
MAX_TMPFILE_PATH_LENGTH
,
tmpPath
,
taosGetPthreadId
(),
atomic_ad
d_fetch_32
(
&
tmpFileSerialNum
,
1
));
}
/*
...
...
src/util/src/tlog.c
浏览文件 @
9b754d70
...
...
@@ -381,7 +381,7 @@ void tprintf(const char *const flags, int dflag, const char *const format, ...)
}
if
(
taosLogMaxLines
>
0
)
{
__sync_add_an
d_fetch_32
(
&
taosLogLines
,
1
);
atomic_ad
d_fetch_32
(
&
taosLogLines
,
1
);
if
((
taosLogLines
>
taosLogMaxLines
)
&&
(
openInProgress
==
0
))
taosOpenNewLogFile
();
}
...
...
@@ -458,7 +458,7 @@ void taosPrintLongString(const char *const flags, int dflag, const char *const f
taosPushLogBuffer
(
logHandle
,
buffer
,
len
);
if
(
taosLogMaxLines
>
0
)
{
__sync_add_an
d_fetch_32
(
&
taosLogLines
,
1
);
atomic_ad
d_fetch_32
(
&
taosLogLines
,
1
);
if
((
taosLogLines
>
taosLogMaxLines
)
&&
(
openInProgress
==
0
))
taosOpenNewLogFile
();
}
...
...
src/util/src/ttimer.c
浏览文件 @
9b754d70
...
...
@@ -105,15 +105,15 @@ static timer_map_t timerMap;
static
uintptr_t
getNextTimerId
()
{
uintptr_t
id
;
do
{
id
=
__sync_add_an
d_fetch_ptr
(
&
nextTimerId
,
1
);
id
=
atomic_ad
d_fetch_ptr
(
&
nextTimerId
,
1
);
}
while
(
id
==
0
);
return
id
;
}
static
void
timerAddRef
(
tmr_obj_t
*
timer
)
{
__sync_add_an
d_fetch_8
(
&
timer
->
refCount
,
1
);
}
static
void
timerAddRef
(
tmr_obj_t
*
timer
)
{
atomic_ad
d_fetch_8
(
&
timer
->
refCount
,
1
);
}
static
void
timerDecRef
(
tmr_obj_t
*
timer
)
{
if
(
__sync_sub_and
_fetch_8
(
&
timer
->
refCount
,
1
)
==
0
)
{
if
(
atomic_sub
_fetch_8
(
&
timer
->
refCount
,
1
)
==
0
)
{
free
(
timer
);
}
}
...
...
@@ -121,7 +121,7 @@ static void timerDecRef(tmr_obj_t* timer) {
static
void
lockTimerList
(
timer_list_t
*
list
)
{
int64_t
tid
=
taosGetPthreadId
();
int
i
=
0
;
while
(
__sync_val_compare_and_swap
_64
(
&
(
list
->
lockedBy
),
0
,
tid
)
!=
0
)
{
while
(
atomic_val_compare_exchange
_64
(
&
(
list
->
lockedBy
),
0
,
tid
)
!=
0
)
{
if
(
++
i
%
1000
==
0
)
{
sched_yield
();
}
...
...
@@ -130,7 +130,7 @@ static void lockTimerList(timer_list_t* list) {
static
void
unlockTimerList
(
timer_list_t
*
list
)
{
int64_t
tid
=
taosGetPthreadId
();
if
(
__sync_val_compare_and_swap
_64
(
&
(
list
->
lockedBy
),
tid
,
0
)
!=
tid
)
{
if
(
atomic_val_compare_exchange
_64
(
&
(
list
->
lockedBy
),
tid
,
0
)
!=
tid
)
{
assert
(
false
);
tmrError
(
"%d trying to unlock a timer list not locked by current thread."
,
tid
);
}
...
...
@@ -257,7 +257,7 @@ static bool removeFromWheel(tmr_obj_t* timer) {
static
void
processExpiredTimer
(
void
*
handle
,
void
*
arg
)
{
tmr_obj_t
*
timer
=
(
tmr_obj_t
*
)
handle
;
timer
->
executedBy
=
taosGetPthreadId
();
uint8_t
state
=
__sync_val_compare_and_swap
_8
(
&
timer
->
state
,
TIMER_STATE_WAITING
,
TIMER_STATE_EXPIRED
);
uint8_t
state
=
atomic_val_compare_exchange
_8
(
&
timer
->
state
,
TIMER_STATE_WAITING
,
TIMER_STATE_EXPIRED
);
if
(
state
==
TIMER_STATE_WAITING
)
{
const
char
*
fmt
=
"%s timer[id=%lld, fp=%p, param=%p] execution start."
;
tmrTrace
(
fmt
,
timer
->
ctrl
->
label
,
timer
->
id
,
timer
->
fp
,
timer
->
param
);
...
...
@@ -431,7 +431,7 @@ bool taosTmrStop(tmr_h timerId) {
return
false
;
}
uint8_t
state
=
__sync_val_compare_and_swap
_8
(
&
timer
->
state
,
TIMER_STATE_WAITING
,
TIMER_STATE_CANCELED
);
uint8_t
state
=
atomic_val_compare_exchange
_8
(
&
timer
->
state
,
TIMER_STATE_WAITING
,
TIMER_STATE_CANCELED
);
doStopTimer
(
timer
,
state
);
timerDecRef
(
timer
);
...
...
@@ -456,7 +456,7 @@ bool taosTmrReset(TAOS_TMR_CALLBACK fp, int mseconds, void* param, void* handle,
if
(
timer
==
NULL
)
{
tmrTrace
(
"%s timer[id=%lld] does not exist"
,
ctrl
->
label
,
id
);
}
else
{
uint8_t
state
=
__sync_val_compare_and_swap
_8
(
&
timer
->
state
,
TIMER_STATE_WAITING
,
TIMER_STATE_CANCELED
);
uint8_t
state
=
atomic_val_compare_exchange
_8
(
&
timer
->
state
,
TIMER_STATE_WAITING
,
TIMER_STATE_CANCELED
);
if
(
!
doStopTimer
(
timer
,
state
))
{
timerDecRef
(
timer
);
timer
=
NULL
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录