Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
51943b6f
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
51943b6f
编写于
11月 08, 2019
作者:
S
slguan
提交者:
GitHub
11月 08, 2019
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #684 from taosdata/feature/newtimer
new timer implementation
上级
1c3fcabe
82ce093a
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
592 addition
and
484 deletion
+592
-484
src/inc/ttimer.h
src/inc/ttimer.h
+16
-15
src/os/darwin/inc/os.h
src/os/darwin/inc/os.h
+34
-0
src/os/linux/inc/os.h
src/os/linux/inc/os.h
+29
-0
src/os/windows/inc/os.h
src/os/windows/inc/os.h
+70
-6
src/os/windows/src/twindows.c
src/os/windows/src/twindows.c
+13
-17
src/util/src/ttimer.c
src/util/src/ttimer.c
+430
-446
未找到文件。
src/inc/ttimer.h
浏览文件 @
51943b6f
...
...
@@ -21,40 +21,41 @@ extern "C" {
#endif
typedef
void
*
tmr_h
;
typedef
void
(
*
TAOS_TMR_CALLBACK
)(
void
*
,
void
*
);
extern
uint32_t
tmrDebugFlag
;
extern
int
taosTmrThreads
;
extern
int
taosTmrThreads
;
#define tmrError(...) \
if (tmrDebugFlag & DEBUG_ERROR) {
\
do { if (tmrDebugFlag & DEBUG_ERROR) {
\
tprintf("ERROR TMR ", tmrDebugFlag, __VA_ARGS__); \
}
} } while(0)
#define tmrWarn(...) \
if (tmrDebugFlag & DEBUG_WARN) {
\
do { if (tmrDebugFlag & DEBUG_WARN) {
\
tprintf("WARN TMR ", tmrDebugFlag, __VA_ARGS__); \
}
} } while(0)
#define tmrTrace(...) \
if (tmrDebugFlag & DEBUG_TRACE) {
\
do { if (tmrDebugFlag & DEBUG_TRACE) {
\
tprintf("TMR ", tmrDebugFlag, __VA_ARGS__); \
}
}
} while(0)
#define MAX_NUM_OF_TMRCTL
51
2
#define MAX_NUM_OF_TMRCTL
3
2
#define MSECONDS_PER_TICK 5
void
*
taosTmrInit
(
int
maxTmr
,
int
resoultion
,
int
longest
,
char
*
label
);
void
*
taosTmrInit
(
int
maxTmr
,
int
resoultion
,
int
longest
,
c
onst
c
har
*
label
);
tmr_h
taosTmrStart
(
void
(
*
fp
)(
void
*
,
void
*
),
int
mseconds
,
void
*
param1
,
void
*
handle
);
tmr_h
taosTmrStart
(
TAOS_TMR_CALLBACK
fp
,
int
mseconds
,
void
*
param
,
void
*
handle
);
void
taosTmrStop
(
tmr_h
tmrId
);
bool
taosTmrStop
(
tmr_h
tmrId
);
void
taosTmrStopA
(
tmr_h
*
timerId
);
bool
taosTmrStopA
(
tmr_h
*
timerId
);
void
taosTmrReset
(
void
(
*
fp
)(
void
*
,
void
*
),
int
mseconds
,
void
*
param1
,
void
*
handle
,
tmr_h
*
pTmrId
);
bool
taosTmrReset
(
TAOS_TMR_CALLBACK
fp
,
int
mseconds
,
void
*
param
,
void
*
handle
,
tmr_h
*
pTmrId
);
void
taosTmrCleanUp
(
void
*
handle
);
void
taosTmrList
(
void
*
handle
);
#ifdef __cplusplus
}
#endif
...
...
src/os/darwin/inc/os.h
浏览文件 @
51943b6f
...
...
@@ -55,10 +55,44 @@
#define taosWriteSocket(fd, buf, len) write(fd, buf, len)
#define taosReadSocket(fd, buf, len) read(fd, buf, len)
#define atomic_load_8(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_16(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_32(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_64(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_ptr(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_store_8(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_store_16(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_store_32(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_store_64(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_store_ptr(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_8(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_16(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_32(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_64(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_ptr(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
// TODO: update prefix of below macros to 'atomic' as '__' is reserved by compiler
// and GCC suggest new code to use '__atomic' builtins to replace '__sync' builtins.
#define __sync_val_compare_and_swap_64 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_32 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_16 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_8 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_ptr __sync_val_compare_and_swap
#define __sync_add_and_fetch_64 __sync_add_and_fetch
#define __sync_add_and_fetch_32 __sync_add_and_fetch
#define __sync_add_and_fetch_16 __sync_add_and_fetch
#define __sync_add_and_fetch_8 __sync_add_and_fetch
#define __sync_add_and_fetch_ptr __sync_add_and_fetch
#define __sync_sub_and_fetch_64 __sync_sub_and_fetch
#define __sync_sub_and_fetch_32 __sync_sub_and_fetch
#define __sync_sub_and_fetch_16 __sync_sub_and_fetch
#define __sync_sub_and_fetch_8 __sync_sub_and_fetch
#define __sync_sub_and_fetch_ptr __sync_sub_and_fetch
int32_t
__sync_val_load_32
(
int32_t
*
ptr
);
void
__sync_val_restore_32
(
int32_t
*
ptr
,
int32_t
newval
);
...
...
src/os/linux/inc/os.h
浏览文件 @
51943b6f
...
...
@@ -71,14 +71,43 @@ extern "C" {
#define taosWriteSocket(fd, buf, len) write(fd, buf, len)
#define taosReadSocket(fd, buf, len) read(fd, buf, len)
#define atomic_load_8(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_16(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_32(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_64(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_load_ptr(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST)
#define atomic_store_8(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_store_16(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_store_32(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_store_64(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_store_ptr(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_8(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_16(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_32(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_64(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
#define atomic_exchange_ptr(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST)
// TODO: update prefix of below macros to 'atomic' as '__' is reserved by compiler
// and GCC suggest new code to use '__atomic' builtins to replace '__sync' builtins.
#define __sync_val_compare_and_swap_64 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_32 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_16 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_8 __sync_val_compare_and_swap
#define __sync_val_compare_and_swap_ptr __sync_val_compare_and_swap
#define __sync_add_and_fetch_64 __sync_add_and_fetch
#define __sync_add_and_fetch_32 __sync_add_and_fetch
#define __sync_add_and_fetch_16 __sync_add_and_fetch
#define __sync_add_and_fetch_8 __sync_add_and_fetch
#define __sync_add_and_fetch_ptr __sync_add_and_fetch
#define __sync_sub_and_fetch_64 __sync_sub_and_fetch
#define __sync_sub_and_fetch_32 __sync_sub_and_fetch
#define __sync_sub_and_fetch_16 __sync_sub_and_fetch
#define __sync_sub_and_fetch_8 __sync_sub_and_fetch
#define __sync_sub_and_fetch_ptr __sync_sub_and_fetch
int32_t
__sync_val_load_32
(
int32_t
*
ptr
);
void
__sync_val_restore_32
(
int32_t
*
ptr
,
int32_t
newval
);
...
...
src/os/windows/inc/os.h
浏览文件 @
51943b6f
...
...
@@ -29,6 +29,7 @@
#include <math.h>
#include <string.h>
#include <assert.h>
#include <intrin.h>
#ifdef __cplusplus
extern
"C"
{
...
...
@@ -78,12 +79,75 @@ extern "C" {
#define taosWriteSocket(fd, buf, len) send(fd, buf, len, 0)
#define taosReadSocket(fd, buf, len) recv(fd, buf, len, 0)
int32_t
__sync_val_compare_and_swap_32
(
int32_t
*
ptr
,
int32_t
oldval
,
int32_t
newval
);
int32_t
__sync_add_and_fetch_32
(
int32_t
*
ptr
,
int32_t
val
);
int32_t
__sync_sub_and_fetch_32
(
int32_t
*
ptr
,
int32_t
val
);
int64_t
__sync_val_compare_and_swap_64
(
int64_t
*
ptr
,
int64_t
oldval
,
int64_t
newval
);
int64_t
__sync_add_and_fetch_64
(
int64_t
*
ptr
,
int64_t
val
);
int64_t
__sync_sub_and_fetch_64
(
int64_t
*
ptr
,
int64_t
val
);
#if defined(_M_ARM) || defined(_M_ARM64)
#define atomic_load_8(ptr) __iso_volatile_load8((const volatile __int8*)(ptr))
#define atomic_load_16(ptr) __iso_volatile_load16((const volatile __int16*)(ptr))
#define atomic_load_32(ptr) __iso_volatile_load32((const volatile __int32*)(ptr))
#define atomic_load_64(ptr) __iso_volatile_load64((const volatile __int64*)(ptr))
#define atomic_store_8(ptr, val) __iso_volatile_store8((volatile __int8*)(ptr), (__int8)(val))
#define atomic_store_16(ptr, val) __iso_volatile_store16((volatile __int16*)(ptr), (__int16)(val))
#define atomic_store_32(ptr, val) __iso_volatile_store32((volatile __int32*)(ptr), (__int32)(val))
#define atomic_store_64(ptr, val) __iso_volatile_store64((volatile __int64*)(ptr), (__int64)(val))
#ifdef _M_ARM64
#define atomic_load_ptr atomic_load_64
#define atomic_store_ptr atomic_store_64
#else
#define atomic_load_ptr atomic_load_32
#define atomic_store_ptr atomic_store_32
#endif
#else
#define atomic_load_8(ptr) (*(char volatile*)(ptr))
#define atomic_load_16(ptr) (*(short volatile*)(ptr))
#define atomic_load_32(ptr) (*(long volatile*)(ptr))
#define atomic_load_64(ptr) (*(__int64 volatile*)(ptr))
#define atomic_load_ptr(ptr) (*(void* volatile*)(ptr))
#define atomic_store_8(ptr, val) ((*(char volatile*)(ptr)) = (char)(val))
#define atomic_store_16(ptr, val) ((*(short volatile*)(ptr)) = (short)(val))
#define atomic_store_32(ptr, val) ((*(long volatile*)(ptr)) = (long)(val))
#define atomic_store_64(ptr, val) ((*(__int64 volatile*)(ptr)) = (__int64)(val))
#define atomic_store_ptr(ptr, val) ((*(void* volatile*)(ptr)) = (void*)(val))
#endif
#define atomic_exchange_8(ptr, val) _InterlockedExchange8((char volatile*)(ptr), (char)(val))
#define atomic_exchange_16(ptr, val) _InterlockedExchange16((short volatile*)(ptr), (short)(val))
#define atomic_exchange_32(ptr, val) _InterlockedExchange((long volatile*)(ptr), (long)(val))
#define atomic_exchange_64(ptr, val) _InterlockedExchange64((__int64 volatile*)(ptr), (__int64)(val))
#define atomic_exchange_ptr(ptr, val) _InterlockedExchangePointer((void* volatile*)(ptr), (void*)(val))
#define __sync_val_compare_and_swap_8(ptr, oldval, newval) _InterlockedCompareExchange8((char volatile*)(ptr), (char)(newval), (char)(oldval))
#define __sync_val_compare_and_swap_16(ptr, oldval, newval) _InterlockedCompareExchange16((short volatile*)(ptr), (short)(newval), (short)(oldval))
#define __sync_val_compare_and_swap_32(ptr, oldval, newval) _InterlockedCompareExchange((long volatile*)(ptr), (long)(newval), (long)(oldval))
#define __sync_val_compare_and_swap_64(ptr, oldval, newval) _InterlockedCompareExchange64((__int64 volatile*)(ptr), (__int64)(newval), (__int64)(oldval))
#define __sync_val_compare_and_swap_ptr(ptr, oldval, newval) _InterlockedCompareExchangePointer((void* volatile*)(ptr), (void*)(newval), (void*)(oldval))
char
interlocked_add_8
(
char
volatile
*
ptr
,
char
val
);
short
interlocked_add_16
(
short
volatile
*
ptr
,
short
val
);
long
interlocked_add_32
(
long
volatile
*
ptr
,
long
val
);
__int64
interlocked_add_64
(
__int64
volatile
*
ptr
,
__int64
val
);
#define __sync_add_and_fetch_8(ptr, val) interlocked_add_8((char volatile*)(ptr), (char)(val))
#define __sync_add_and_fetch_16(ptr, val) interlocked_add_16((short volatile*)(ptr), (short)(val))
#define __sync_add_and_fetch_32(ptr, val) interlocked_add_32((long volatile*)(ptr), (long)(val))
#define __sync_add_and_fetch_64(ptr, val) interlocked_add_64((__int64 volatile*)(ptr), (__int64)(val))
#ifdef _WIN64
#define __sync_add_and_fetch_ptr __sync_add_and_fetch_64
#else
#define __sync_add_and_fetch_ptr __sync_add_and_fetch_32
#endif
#define __sync_sub_and_fetch_8(ptr, val) __sync_add_and_fetch_8((ptr), -(val))
#define __sync_sub_and_fetch_16(ptr, val) __sync_add_and_fetch_16((ptr), -(val))
#define __sync_sub_and_fetch_32(ptr, val) __sync_add_and_fetch_32((ptr), -(val))
#define __sync_sub_and_fetch_64(ptr, val) __sync_add_and_fetch_64((ptr), -(val))
#define __sync_sub_and_fetch_ptr(ptr, val) __sync_add_and_fetch_ptr((ptr), -(val))
int32_t
__sync_val_load_32
(
int32_t
*
ptr
);
void
__sync_val_restore_32
(
int32_t
*
ptr
,
int32_t
newval
);
...
...
src/os/windows/src/twindows.c
浏览文件 @
51943b6f
...
...
@@ -43,8 +43,11 @@ void taosResetPthread(pthread_t *thread) {
}
int64_t
taosGetPthreadId
()
{
pthread_t
id
=
pthread_self
();
return
(
int64_t
)
id
.
p
;
#ifdef PTW32_VERSION
return
pthread_getw32threadid_np
(
pthread_self
());
#else
return
(
int64_t
)
pthread_self
();
#endif
}
int
taosSetSockOpt
(
int
socketfd
,
int
level
,
int
optname
,
void
*
optval
,
int
optlen
)
{
...
...
@@ -63,28 +66,21 @@ int taosSetSockOpt(int socketfd, int level, int optname, void *optval, int optle
return
setsockopt
(
socketfd
,
level
,
optname
,
optval
,
optlen
);
}
int32_t
__sync_val_compare_and_swap_32
(
int32_t
*
ptr
,
int32_t
oldval
,
int32_t
newval
)
{
return
InterlockedCompareExchange
(
ptr
,
newval
,
oldval
);
}
int32_t
__sync_add_and_fetch_32
(
int32_t
*
ptr
,
int32_t
val
)
{
return
InterlockedAdd
(
ptr
,
val
);
}
int32_t
__sync_sub_and_fetch_32
(
int32_t
*
ptr
,
int32_t
val
)
{
return
InterlockedAdd
(
ptr
,
-
val
)
;
char
interlocked_add_8
(
char
volatile
*
ptr
,
char
val
)
{
return
_InterlockedExchangeAdd8
(
ptr
,
val
)
+
val
;
}
int64_t
__sync_val_compare_and_swap_64
(
int64_t
*
ptr
,
int64_t
oldval
,
int64_t
new
val
)
{
return
InterlockedCompareExchange64
(
ptr
,
newval
,
oldval
)
;
short
interlocked_add_16
(
short
volatile
*
ptr
,
short
val
)
{
return
_InterlockedExchangeAdd16
(
ptr
,
val
)
+
val
;
}
int64_t
__sync_add_and_fetch_64
(
int64_t
*
ptr
,
int64_t
val
)
{
return
InterlockedAdd64
(
ptr
,
val
)
;
long
interlocked_add_32
(
long
volatile
*
ptr
,
long
val
)
{
return
_InterlockedExchangeAdd
(
ptr
,
val
)
+
val
;
}
int64_t
__sync_sub_and_fetch_64
(
int64_t
*
ptr
,
int64_t
val
)
{
return
InterlockedAdd64
(
ptr
,
-
val
)
;
__int64
interlocked_add_64
(
__int64
volatile
*
ptr
,
__int64
val
)
{
return
_InterlockedExchangeAdd64
(
ptr
,
val
)
+
val
;
}
int32_t
__sync_val_load_32
(
int32_t
*
ptr
)
{
...
...
src/util/src/ttimer.c
浏览文件 @
51943b6f
...
...
@@ -16,555 +16,539 @@
#include <assert.h>
#include <errno.h>
#include <pthread.h>
#include <semaphore.h>
#include <signal.h>
#include <sched.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "os.h"
#include "tidpool.h"
#include "tlog.h"
#include "tsched.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
// special mempool without mutex
#define mpool_h void *
typedef
struct
{
int
numOfFree
;
/* number of free slots */
int
first
;
/* the first free slot */
int
numOfBlock
;
/* the number of blocks */
int
blockSize
;
/* block size in bytes */
int
*
freeList
;
/* the index list */
char
*
pool
;
/* the actual mem block */
}
pool_t
;
mpool_h
tmrMemPoolInit
(
int
maxNum
,
int
blockSize
);
char
*
tmrMemPoolMalloc
(
mpool_h
handle
);
void
tmrMemPoolFree
(
mpool_h
handle
,
char
*
p
);
void
tmrMemPoolCleanUp
(
mpool_h
handle
);
typedef
struct
_tmr_obj
{
void
*
param1
;
void
(
*
fp
)(
void
*
,
void
*
);
tmr_h
timerId
;
short
cycle
;
struct
_tmr_obj
*
prev
;
struct
_tmr_obj
*
next
;
int
index
;
struct
_tmr_ctrl_t
*
pCtrl
;
}
tmr_obj_t
;
typedef
struct
{
tmr_obj_t
*
head
;
int
count
;
}
tmr_list_t
;
typedef
struct
_tmr_ctrl_t
{
void
*
signature
;
pthread_mutex_t
mutex
;
/* mutex to protect critical resource */
int
resolution
;
/* resolution in mseconds */
int
numOfPeriods
;
/* total number of periods */
int64_t
periodsFromStart
;
/* count number of periods since start */
pthread_t
thread
;
/* timer thread ID */
tmr_list_t
*
tmrList
;
mpool_h
poolHandle
;
char
label
[
12
];
int
maxNumOfTmrs
;
int
numOfTmrs
;
int
ticks
;
int
maxTicks
;
int
tmrCtrlId
;
}
tmr_ctrl_t
;
uint32_t
tmrDebugFlag
=
DEBUG_ERROR
|
DEBUG_WARN
|
DEBUG_FILE
;
void
taosTmrProcessList
(
tmr_ctrl_t
*
);
tmr_ctrl_t
tmrCtrl
[
MAX_NUM_OF_TMRCTL
];
int
numOfTmrCtrl
=
0
;
void
*
tmrIdPool
=
NULL
;
void
*
tmrQhandle
;
int
taosTmrThreads
=
1
;
void
taosTimerLoopFunc
(
int
signo
)
{
tmr_ctrl_t
*
pCtrl
;
int
count
=
0
;
for
(
int
i
=
1
;
i
<
MAX_NUM_OF_TMRCTL
;
++
i
)
{
pCtrl
=
tmrCtrl
+
i
;
if
(
pCtrl
->
signature
)
{
count
++
;
pCtrl
->
ticks
++
;
if
(
pCtrl
->
ticks
>=
pCtrl
->
maxTicks
)
{
taosTmrProcessList
(
pCtrl
);
pCtrl
->
ticks
=
0
;
}
if
(
count
>=
numOfTmrCtrl
)
break
;
}
}
}
#define TIMER_STATE_WAITING 0
#define TIMER_STATE_EXPIRED 1
#define TIMER_STATE_STOPPED 2
#define TIMER_STATE_CANCELED 3
void
taosTmrModuleInit
(
void
)
{
tmrIdPool
=
taosInitIdPool
(
MAX_NUM_OF_TMRCTL
);
memset
(
tmrCtrl
,
0
,
sizeof
(
tmrCtrl
));
typedef
union
_tmr_ctrl_t
{
char
label
[
16
];
struct
{
// pad to ensure 'next' is the end of this union
char
padding
[
16
-
sizeof
(
union
_tmr_ctrl_t
*
)];
union
_tmr_ctrl_t
*
next
;
};
}
tmr_ctrl_t
;
taosInitTimer
(
taosTimerLoopFunc
,
MSECONDS_PER_TICK
);
typedef
struct
tmr_obj_t
{
uintptr_t
id
;
tmr_ctrl_t
*
ctrl
;
struct
tmr_obj_t
*
mnext
;
struct
tmr_obj_t
*
prev
;
struct
tmr_obj_t
*
next
;
uint16_t
slot
;
uint8_t
wheel
;
uint8_t
state
;
uint8_t
refCount
;
uint8_t
reserved1
;
uint16_t
reserved2
;
union
{
int64_t
expireAt
;
int64_t
executedBy
;
};
TAOS_TMR_CALLBACK
fp
;
void
*
param
;
}
tmr_obj_t
;
tmrQhandle
=
taosInitScheduler
(
10000
,
taosTmrThreads
,
"tmr"
);
tmrTrace
(
"timer module is initialized, thread:%d"
,
taosTmrThreads
);
typedef
struct
timer_list_t
{
int64_t
lockedBy
;
tmr_obj_t
*
timers
;
}
timer_list_t
;
typedef
struct
timer_map_t
{
uint32_t
size
;
uint32_t
count
;
timer_list_t
*
slots
;
}
timer_map_t
;
typedef
struct
time_wheel_t
{
pthread_mutex_t
mutex
;
int64_t
nextScanAt
;
uint32_t
resolution
;
uint16_t
size
;
uint16_t
index
;
tmr_obj_t
**
slots
;
}
time_wheel_t
;
uint32_t
tmrDebugFlag
=
DEBUG_ERROR
|
DEBUG_WARN
|
DEBUG_FILE
;
static
pthread_once_t
tmrModuleInit
=
PTHREAD_ONCE_INIT
;
static
pthread_mutex_t
tmrCtrlMutex
;
static
tmr_ctrl_t
tmrCtrls
[
MAX_NUM_OF_TMRCTL
];
static
tmr_ctrl_t
*
unusedTmrCtrl
=
NULL
;
void
*
tmrQhandle
;
int
taosTmrThreads
=
1
;
static
uintptr_t
nextTimerId
=
0
;
static
time_wheel_t
wheels
[]
=
{
{.
resolution
=
MSECONDS_PER_TICK
,
.
size
=
4096
},
{.
resolution
=
1000
,
.
size
=
1024
},
{.
resolution
=
60000
,
.
size
=
1024
},
};
static
timer_map_t
timerMap
;
static
uintptr_t
getNextTimerId
()
{
uintptr_t
id
;
do
{
id
=
__sync_add_and_fetch_ptr
(
&
nextTimerId
,
1
);
}
while
(
id
==
0
);
return
id
;
}
void
*
taosTmrInit
(
int
maxNumOfTmrs
,
int
resolution
,
int
longest
,
char
*
label
)
{
static
pthread_once_t
tmrInit
=
PTHREAD_ONCE_INIT
;
tmr_ctrl_t
*
pCtrl
;
static
void
timerAddRef
(
tmr_obj_t
*
timer
)
{
__sync_add_and_fetch_8
(
&
timer
->
refCount
,
1
);
}
pthread_once
(
&
tmrInit
,
taosTmrModuleInit
);
int
tmrCtrlId
=
taosAllocateId
(
tmrIdPool
);
if
(
tmrCtrlId
<
0
)
{
tmrError
(
"%s bug!!! too many timers!!!"
,
label
);
return
NULL
;
}
pCtrl
=
tmrCtrl
+
tmrCtrlId
;
tfree
(
pCtrl
->
tmrList
);
tmrMemPoolCleanUp
(
pCtrl
->
poolHandle
);
memset
(
pCtrl
,
0
,
sizeof
(
tmr_ctrl_t
));
pCtrl
->
tmrCtrlId
=
tmrCtrlId
;
strcpy
(
pCtrl
->
label
,
label
);
pCtrl
->
maxNumOfTmrs
=
maxNumOfTmrs
;
if
((
pCtrl
->
poolHandle
=
tmrMemPoolInit
(
maxNumOfTmrs
+
10
,
sizeof
(
tmr_obj_t
)))
==
NULL
)
{
tmrError
(
"%s failed to allocate mem pool"
,
label
);
tmrMemPoolCleanUp
(
pCtrl
->
poolHandle
);
return
NULL
;
static
void
timerDecRef
(
tmr_obj_t
*
timer
)
{
if
(
__sync_sub_and_fetch_8
(
&
timer
->
refCount
,
1
)
==
0
)
{
free
(
timer
);
}
}
if
(
resolution
<
MSECONDS_PER_TICK
)
resolution
=
MSECONDS_PER_TICK
;
pCtrl
->
resolution
=
resolution
;
pCtrl
->
maxTicks
=
resolution
/
MSECONDS_PER_TICK
;
pCtrl
->
ticks
=
rand
()
/
pCtrl
->
maxTicks
;
pCtrl
->
numOfPeriods
=
longest
/
resolution
;
if
(
pCtrl
->
numOfPeriods
<
10
)
pCtrl
->
numOfPeriods
=
10
;
pCtrl
->
tmrList
=
(
tmr_list_t
*
)
malloc
(
sizeof
(
tmr_list_t
)
*
pCtrl
->
numOfPeriods
);
if
(
pCtrl
->
tmrList
==
NULL
)
{
tmrError
(
"%s failed to allocate(size:%d) mem for tmrList"
,
label
,
sizeof
(
tmr_list_t
)
*
pCtrl
->
numOfPeriods
);
tmrMemPoolCleanUp
(
pCtrl
->
poolHandle
);
taosTmrCleanUp
(
pCtrl
);
return
NULL
;
}
for
(
int
i
=
0
;
i
<
pCtrl
->
numOfPeriods
;
i
++
)
{
pCtrl
->
tmrList
[
i
].
head
=
NULL
;
pCtrl
->
tmrList
[
i
].
count
=
0
;
static
void
lockTimerList
(
timer_list_t
*
list
)
{
int64_t
tid
=
taosGetPthreadId
();
int
i
=
0
;
while
(
__sync_val_compare_and_swap_64
(
&
(
list
->
lockedBy
),
0
,
tid
)
!=
0
)
{
if
(
++
i
%
1000
==
0
)
{
sched_yield
();
}
}
}
if
(
pthread_mutex_init
(
&
pCtrl
->
mutex
,
NULL
)
<
0
)
{
tmrError
(
"%s failed to create the mutex, reason:%s"
,
label
,
strerror
(
errno
));
taosTmrCleanUp
(
pCtrl
);
return
NULL
;
static
void
unlockTimerList
(
timer_list_t
*
list
)
{
int64_t
tid
=
taosGetPthreadId
();
if
(
__sync_val_compare_and_swap_64
(
&
(
list
->
lockedBy
),
tid
,
0
)
!=
tid
)
{
assert
(
false
);
tmrError
(
"trying to unlock a timer list not locked by current thread."
);
}
pCtrl
->
signature
=
pCtrl
;
numOfTmrCtrl
++
;
tmrTrace
(
"%s timer ctrl is initialized, index:%d"
,
label
,
tmrCtrlId
);
return
pCtrl
;
}
void
taosTmrProcessList
(
tmr_ctrl_t
*
pCtrl
)
{
unsigned
int
index
;
tmr_list_t
*
pList
;
tmr_obj_t
*
pObj
,
*
header
;
static
void
addTimer
(
tmr_obj_t
*
timer
)
{
timerAddRef
(
timer
);
timer
->
wheel
=
tListLen
(
wheels
);
pthread_mutex_lock
(
&
pCtrl
->
mutex
);
index
=
pCtrl
->
periodsFromStart
%
pCtrl
->
numOfPeriods
;
pList
=
&
pCtrl
->
tmrList
[
index
];
uint32_t
idx
=
(
uint32_t
)(
timer
->
id
%
timerMap
.
size
);
timer_list_t
*
list
=
timerMap
.
slots
+
idx
;
while
(
1
)
{
header
=
pList
->
head
;
if
(
header
==
NULL
)
break
;
lockTimerList
(
list
);
timer
->
mnext
=
list
->
timers
;
list
->
timers
=
timer
;
unlockTimerList
(
list
);
}
if
(
header
->
cycle
>
0
)
{
pObj
=
header
;
while
(
pObj
)
{
pObj
->
cycle
--
;
pObj
=
pObj
->
next
;
static
tmr_obj_t
*
findTimer
(
uintptr_t
id
)
{
tmr_obj_t
*
timer
=
NULL
;
if
(
id
>
0
)
{
uint32_t
idx
=
(
uint32_t
)(
id
%
timerMap
.
size
);
timer_list_t
*
list
=
timerMap
.
slots
+
idx
;
lockTimerList
(
list
);
for
(
timer
=
list
->
timers
;
timer
!=
NULL
;
timer
=
timer
->
mnext
)
{
if
(
timer
->
id
==
id
)
{
timerAddRef
(
timer
);
break
;
}
break
;
}
pCtrl
->
numOfTmrs
--
;
tmrTrace
(
"%s %p, timer expired, fp:%p, tmr_h:%p, index:%d, total:%d"
,
pCtrl
->
label
,
header
->
param1
,
header
->
fp
,
header
,
index
,
pCtrl
->
numOfTmrs
);
pList
->
head
=
header
->
next
;
if
(
header
->
next
)
header
->
next
->
prev
=
NULL
;
pList
->
count
--
;
header
->
timerId
=
NULL
;
SSchedMsg
schedMsg
;
schedMsg
.
fp
=
NULL
;
schedMsg
.
tfp
=
header
->
fp
;
schedMsg
.
ahandle
=
header
->
param1
;
schedMsg
.
thandle
=
header
;
taosScheduleTask
(
tmrQhandle
,
&
schedMsg
);
tmrMemPoolFree
(
pCtrl
->
poolHandle
,
(
char
*
)
header
);
unlockTimerList
(
list
);
}
pCtrl
->
periodsFromStart
++
;
pthread_mutex_unlock
(
&
pCtrl
->
mutex
);
}
void
taosTmrCleanUp
(
void
*
handle
)
{
tmr_ctrl_t
*
pCtrl
=
(
tmr_ctrl_t
*
)
handle
;
if
(
pCtrl
==
NULL
||
pCtrl
->
signature
!=
pCtrl
)
return
;
pCtrl
->
signature
=
NULL
;
taosFreeId
(
tmrIdPool
,
pCtrl
->
tmrCtrlId
);
numOfTmrCtrl
--
;
tmrTrace
(
"%s is cleaned up, numOfTmrs:%d"
,
pCtrl
->
label
,
numOfTmrCtrl
);
return
timer
;
}
tmr_h
taosTmrStart
(
void
(
*
fp
)(
void
*
,
void
*
),
int
mseconds
,
void
*
param1
,
void
*
handle
)
{
tmr_obj_t
*
pObj
,
*
cNode
,
*
pNode
;
tmr_list_t
*
pList
;
int
index
,
period
;
tmr_ctrl_t
*
pCtrl
=
(
tmr_ctrl_t
*
)
handle
;
if
(
handle
==
NULL
)
return
NULL
;
period
=
mseconds
/
pCtrl
->
resolution
;
if
(
pthread_mutex_lock
(
&
pCtrl
->
mutex
)
!=
0
)
tmrError
(
"%s mutex lock failed, reason:%s"
,
pCtrl
->
label
,
strerror
(
errno
));
pObj
=
(
tmr_obj_t
*
)
tmrMemPoolMalloc
(
pCtrl
->
poolHandle
);
if
(
pObj
==
NULL
)
{
tmrError
(
"%s reach max number of timers:%d"
,
pCtrl
->
label
,
pCtrl
->
maxNumOfTmrs
);
pthread_mutex_unlock
(
&
pCtrl
->
mutex
);
return
NULL
;
static
void
removeTimer
(
uintptr_t
id
)
{
tmr_obj_t
*
prev
=
NULL
;
uint32_t
idx
=
(
uint32_t
)(
id
%
timerMap
.
size
);
timer_list_t
*
list
=
timerMap
.
slots
+
idx
;
lockTimerList
(
list
);
for
(
tmr_obj_t
*
p
=
list
->
timers
;
p
!=
NULL
;
p
=
p
->
mnext
)
{
if
(
p
->
id
==
id
)
{
if
(
prev
==
NULL
)
{
list
->
timers
=
p
->
mnext
;
}
else
{
prev
->
mnext
=
p
->
mnext
;
}
timerDecRef
(
p
);
break
;
}
prev
=
p
;
}
unlockTimerList
(
list
);
}
pObj
->
cycle
=
period
/
pCtrl
->
numOfPeriods
;
pObj
->
param1
=
param1
;
pObj
->
fp
=
fp
;
pObj
->
timerId
=
pObj
;
pObj
->
pCtrl
=
pCtrl
;
index
=
(
period
+
pCtrl
->
periodsFromStart
)
%
pCtrl
->
numOfPeriods
;
int
cindex
=
(
pCtrl
->
periodsFromStart
)
%
pCtrl
->
numOfPeriods
;
pList
=
&
(
pCtrl
->
tmrList
[
index
]);
pObj
->
index
=
index
;
cNode
=
pList
->
head
;
pNode
=
NULL
;
while
(
cNode
!=
NULL
)
{
if
(
cNode
->
cycle
<
pObj
->
cycle
)
{
pNode
=
cNode
;
cNode
=
cNode
->
next
;
}
else
{
static
void
addToWheel
(
tmr_obj_t
*
timer
,
uint32_t
delay
)
{
timerAddRef
(
timer
);
// select a wheel for the timer, we are not an accurate timer,
// but the inaccuracy should not be too large.
timer
->
wheel
=
tListLen
(
wheels
)
-
1
;
for
(
uint8_t
i
=
0
;
i
<
tListLen
(
wheels
);
i
++
)
{
time_wheel_t
*
wheel
=
wheels
+
i
;
if
(
delay
<
wheel
->
resolution
*
wheel
->
size
)
{
timer
->
wheel
=
i
;
break
;
}
}
pObj
->
next
=
cNode
;
pObj
->
prev
=
pNode
;
time_wheel_t
*
wheel
=
wheels
+
timer
->
wheel
;
timer
->
prev
=
NULL
;
timer
->
expireAt
=
taosGetTimestampMs
()
+
delay
;
if
(
cNode
!=
NULL
)
{
cNode
->
prev
=
pObj
;
}
pthread_mutex_lock
(
&
wheel
->
mutex
);
if
(
pNode
!=
NULL
)
{
pNode
->
next
=
pObj
;
}
else
{
pList
->
head
=
pObj
;
uint32_t
idx
=
0
;
if
(
timer
->
expireAt
>
wheel
->
nextScanAt
)
{
// adjust delay according to next scan time of this wheel
// so that the timer is not fired earlier than desired.
delay
=
(
uint32_t
)(
timer
->
expireAt
-
wheel
->
nextScanAt
);
idx
=
(
delay
+
wheel
->
resolution
-
1
)
/
wheel
->
resolution
;
}
pList
->
count
++
;
pCtrl
->
numOfTmrs
++
;
if
(
pthread_mutex_unlock
(
&
pCtrl
->
mutex
)
!=
0
)
tmrError
(
"%s mutex unlock failed, reason:%s"
,
pCtrl
->
label
,
strerror
(
errno
));
tmrTrace
(
"%s %p, timer started, fp:%p, tmr_h:%p, index:%d, total:%d cindex:%d"
,
pCtrl
->
label
,
param1
,
fp
,
pObj
,
index
,
pCtrl
->
numOfTmrs
,
cindex
);
timer
->
slot
=
(
uint16_t
)((
wheel
->
index
+
idx
+
1
)
%
wheel
->
size
);
tmr_obj_t
*
p
=
wheel
->
slots
[
timer
->
slot
];
wheel
->
slots
[
timer
->
slot
]
=
timer
;
timer
->
next
=
p
;
if
(
p
!=
NULL
)
{
p
->
prev
=
timer
;
}
return
(
tmr_h
)
pObj
;
pthread_mutex_unlock
(
&
wheel
->
mutex
)
;
}
void
taosTmrStop
(
tmr_h
timerId
)
{
tmr_obj_t
*
pObj
;
tmr_list_t
*
pList
;
tmr_ctrl_t
*
pCtrl
;
pObj
=
(
tmr_obj_t
*
)
timerId
;
if
(
pObj
==
NULL
)
return
;
pCtrl
=
pObj
->
pCtrl
;
if
(
pCtrl
==
NULL
)
return
;
if
(
pthread_mutex_lock
(
&
pCtrl
->
mutex
)
!=
0
)
tmrError
(
"%s mutex lock failed, reason:%s"
,
pCtrl
->
label
,
strerror
(
errno
));
if
(
pObj
->
timerId
==
timerId
)
{
pList
=
&
(
pCtrl
->
tmrList
[
pObj
->
index
]);
if
(
pObj
->
prev
)
{
pObj
->
prev
->
next
=
pObj
->
next
;
}
else
{
pList
->
head
=
pObj
->
next
;
static
bool
removeFromWheel
(
tmr_obj_t
*
timer
)
{
if
(
timer
->
wheel
>=
tListLen
(
wheels
))
{
return
false
;
}
time_wheel_t
*
wheel
=
wheels
+
timer
->
wheel
;
bool
removed
=
false
;
pthread_mutex_lock
(
&
wheel
->
mutex
);
// other thread may modify timer->wheel, check again.
if
(
timer
->
wheel
<
tListLen
(
wheels
))
{
if
(
timer
->
prev
!=
NULL
)
{
timer
->
prev
->
next
=
timer
->
next
;
}
if
(
pObj
->
next
)
{
pObj
->
next
->
prev
=
pObj
->
prev
;
if
(
timer
->
next
!=
NULL
)
{
timer
->
next
->
prev
=
timer
->
prev
;
}
pList
->
count
--
;
pObj
->
timerId
=
NULL
;
pCtrl
->
numOfTmrs
--
;
t
mrTrace
(
"%s %p, timer stopped, fp:%p, tmr_h:%p, total:%d"
,
pCtrl
->
label
,
pObj
->
param1
,
pObj
->
fp
,
pObj
,
pCtrl
->
numOfTmrs
);
tmrMemPoolFree
(
pCtrl
->
poolHandle
,
(
char
*
)(
pObj
))
;
if
(
timer
==
wheel
->
slots
[
timer
->
slot
])
{
wheel
->
slots
[
timer
->
slot
]
=
timer
->
next
;
}
timer
->
wheel
=
tListLen
(
wheels
)
;
timer
->
next
=
NULL
;
t
imer
->
prev
=
NULL
;
timerDecRef
(
timer
);
removed
=
true
;
}
pthread_mutex_unlock
(
&
wheel
->
mutex
);
pthread_mutex_unlock
(
&
pCtrl
->
mutex
)
;
return
removed
;
}
void
taosTmrStopA
(
tmr_h
*
timerId
)
{
tmr_obj_t
*
pObj
;
tmr_list_t
*
pList
;
tmr_ctrl_t
*
pCtrl
;
static
void
processExpiredTimer
(
void
*
handle
,
void
*
arg
)
{
tmr_obj_t
*
timer
=
(
tmr_obj_t
*
)
handle
;
timer
->
executedBy
=
taosGetPthreadId
();
uint8_t
state
=
__sync_val_compare_and_swap_8
(
&
timer
->
state
,
TIMER_STATE_WAITING
,
TIMER_STATE_EXPIRED
);
if
(
state
==
TIMER_STATE_WAITING
)
{
const
char
*
fmt
=
"timer[label=%s, id=%lld, fp=%p, param=%p] execution start."
;
tmrTrace
(
fmt
,
timer
->
ctrl
->
label
,
timer
->
id
,
timer
->
fp
,
timer
->
param
);
pObj
=
*
(
tmr_obj_t
**
)
timerId
;
if
(
pObj
==
NULL
)
return
;
(
*
timer
->
fp
)(
timer
->
param
,
(
tmr_h
)
timer
->
id
)
;
atomic_store_8
(
&
timer
->
state
,
TIMER_STATE_STOPPED
)
;
pCtrl
=
pObj
->
pCtrl
;
if
(
pCtrl
==
NULL
)
return
;
if
(
pthread_mutex_lock
(
&
pCtrl
->
mutex
)
!=
0
)
tmrError
(
"%s mutex lock failed, reason:%s"
,
pCtrl
->
label
,
strerror
(
errno
));
if
(
pObj
->
timerId
==
pObj
)
{
pList
=
&
(
pCtrl
->
tmrList
[
pObj
->
index
]);
if
(
pObj
->
prev
)
{
pObj
->
prev
->
next
=
pObj
->
next
;
}
else
{
pList
->
head
=
pObj
->
next
;
}
fmt
=
"timer[label=%s, id=%lld, fp=%p, param=%p] execution end."
;
tmrTrace
(
fmt
,
timer
->
ctrl
->
label
,
timer
->
id
,
timer
->
fp
,
timer
->
param
);
}
removeTimer
(
timer
->
id
);
timerDecRef
(
timer
);
}
if
(
pObj
->
next
)
{
pObj
->
next
->
prev
=
pObj
->
prev
;
}
static
void
addToExpired
(
tmr_obj_t
*
head
)
{
const
char
*
fmt
=
"timer[label=%s, id=%lld, fp=%p, param=%p] expired"
;
pList
->
count
--
;
pObj
->
timerId
=
NULL
;
pCtrl
->
numOfTmrs
--
;
while
(
head
!=
NULL
)
{
tmrTrace
(
fmt
,
head
->
ctrl
->
label
,
head
->
id
,
head
->
fp
,
head
->
param
);
tmrTrace
(
"%s %p, timer stopped atomiclly, fp:%p, tmr_h:%p, total:%d"
,
pCtrl
->
label
,
pObj
->
param1
,
pObj
->
fp
,
pObj
,
pCtrl
->
numOfTmrs
);
tmrMemPoolFree
(
pCtrl
->
poolHandle
,
(
char
*
)(
pObj
));
tmr_obj_t
*
next
=
head
->
next
;
SSchedMsg
schedMsg
;
schedMsg
.
fp
=
NULL
;
schedMsg
.
tfp
=
processExpiredTimer
;
schedMsg
.
ahandle
=
head
;
schedMsg
.
thandle
=
NULL
;
taosScheduleTask
(
tmrQhandle
,
&
schedMsg
);
head
=
next
;
}
}
*
(
tmr_obj_t
**
)
timerId
=
NULL
;
static
uintptr_t
doStartTimer
(
tmr_obj_t
*
timer
,
TAOS_TMR_CALLBACK
fp
,
int
mseconds
,
void
*
param
,
tmr_ctrl_t
*
ctrl
)
{
uintptr_t
id
=
getNextTimerId
();
timer
->
id
=
id
;
timer
->
state
=
TIMER_STATE_WAITING
;
timer
->
fp
=
fp
;
timer
->
param
=
param
;
timer
->
ctrl
=
ctrl
;
addTimer
(
timer
);
const
char
*
fmt
=
"timer[label=%s, id=%lld, fp=%p, param=%p] started"
;
tmrTrace
(
fmt
,
ctrl
->
label
,
timer
->
id
,
timer
->
fp
,
timer
->
param
);
if
(
mseconds
==
0
)
{
timer
->
wheel
=
tListLen
(
wheels
);
timerAddRef
(
timer
);
addToExpired
(
timer
);
}
else
{
tmrTrace
(
"%s %p, timer stopped atomiclly, fp:%p, tmr_h:%p, total:%d"
,
pCtrl
->
label
,
pObj
->
param1
,
pObj
->
fp
,
pObj
,
pCtrl
->
numOfTmrs
);
addToWheel
(
timer
,
mseconds
);
}
pthread_mutex_unlock
(
&
pCtrl
->
mutex
);
// note: use `timer->id` here is unsafe as `timer` may already be freed
return
id
;
}
void
taosTmrReset
(
void
(
*
fp
)(
void
*
,
void
*
),
int
mseconds
,
void
*
param1
,
void
*
handle
,
tmr_h
*
pTmrId
)
{
tmr_obj_t
*
pObj
,
*
cNode
,
*
pNode
;
tmr_list_t
*
pList
;
int
index
,
period
;
tmr_ctrl_t
*
pCtrl
=
(
tmr_ctrl_t
*
)
handle
;
if
(
handle
==
NULL
)
return
;
if
(
pTmrId
==
NULL
)
return
;
period
=
mseconds
/
pCtrl
->
resolution
;
if
(
pthread_mutex_lock
(
&
pCtrl
->
mutex
)
!=
0
)
tmrError
(
"%s mutex lock failed, reason:%s"
,
pCtrl
->
label
,
strerror
(
errno
));
tmr_h
taosTmrStart
(
TAOS_TMR_CALLBACK
fp
,
int
mseconds
,
void
*
param
,
void
*
handle
)
{
tmr_ctrl_t
*
ctrl
=
(
tmr_ctrl_t
*
)
handle
;
if
(
ctrl
==
NULL
||
ctrl
->
label
[
0
]
==
0
)
{
return
NULL
;
}
pObj
=
(
tmr_obj_t
*
)(
*
pTmrId
);
tmr_obj_t
*
timer
=
(
tmr_obj_t
*
)
calloc
(
1
,
sizeof
(
tmr_obj_t
));
if
(
timer
==
NULL
)
{
tmrError
(
"failed to allocated memory for new timer object."
);
return
NULL
;
}
if
(
pObj
&&
pObj
->
timerId
==
*
pTmrId
)
{
// exist, stop it first
pList
=
&
(
pCtrl
->
tmrList
[
pObj
->
index
]);
if
(
pObj
->
prev
)
{
pObj
->
prev
->
next
=
pObj
->
next
;
}
else
{
pList
->
head
=
pObj
->
next
;
}
return
(
tmr_h
)
doStartTimer
(
timer
,
fp
,
mseconds
,
param
,
ctrl
);
}
if
(
pObj
->
next
)
{
pObj
->
next
->
prev
=
pObj
->
prev
;
static
void
taosTimerLoopFunc
(
int
signo
)
{
int64_t
now
=
taosGetTimestampMs
();
for
(
int
i
=
0
;
i
<
tListLen
(
wheels
);
i
++
)
{
// `expried` is a temporary expire list.
// expired timers are first add to this list, then move
// to expired queue as a batch to improve performance.
// note this list is used as a stack in this function.
tmr_obj_t
*
expired
=
NULL
;
time_wheel_t
*
wheel
=
wheels
+
i
;
while
(
now
>=
wheel
->
nextScanAt
)
{
pthread_mutex_lock
(
&
wheel
->
mutex
);
wheel
->
index
=
(
wheel
->
index
+
1
)
%
wheel
->
size
;
tmr_obj_t
*
timer
=
wheel
->
slots
[
wheel
->
index
];
while
(
timer
!=
NULL
)
{
tmr_obj_t
*
next
=
timer
->
next
;
if
(
now
<
timer
->
expireAt
)
{
timer
=
next
;
continue
;
}
// remove from the wheel
if
(
timer
->
prev
==
NULL
)
{
wheel
->
slots
[
wheel
->
index
]
=
next
;
if
(
next
!=
NULL
)
{
next
->
prev
=
NULL
;
}
}
else
{
timer
->
prev
->
next
=
next
;
if
(
next
!=
NULL
)
{
next
->
prev
=
timer
->
prev
;
}
}
timer
->
wheel
=
tListLen
(
wheels
);
// add to temporary expire list
timer
->
next
=
expired
;
timer
->
prev
=
NULL
;
if
(
expired
!=
NULL
)
{
expired
->
prev
=
timer
;
}
expired
=
timer
;
timer
=
next
;
}
pthread_mutex_unlock
(
&
wheel
->
mutex
);
wheel
->
nextScanAt
+=
wheel
->
resolution
;
}
pList
->
count
--
;
pObj
->
timerId
=
NULL
;
pCtrl
->
numOfTmrs
--
;
}
else
{
// timer not there, or already expired
pObj
=
(
tmr_obj_t
*
)
tmrMemPoolMalloc
(
pCtrl
->
poolHandle
);
*
pTmrId
=
pObj
;
if
(
pObj
==
NULL
)
{
tmrError
(
"%s failed to allocate timer, max:%d allocated:%d"
,
pCtrl
->
label
,
pCtrl
->
maxNumOfTmrs
,
pCtrl
->
numOfTmrs
);
pthread_mutex_unlock
(
&
pCtrl
->
mutex
);
return
;
}
addToExpired
(
expired
);
}
}
pObj
->
cycle
=
period
/
pCtrl
->
numOfPeriods
;
pObj
->
param1
=
param1
;
pObj
->
fp
=
fp
;
pObj
->
timerId
=
pObj
;
pObj
->
pCtrl
=
pCtrl
;
static
bool
doStopTimer
(
tmr_obj_t
*
timer
,
uint8_t
state
)
{
bool
reusable
=
false
;
index
=
(
period
+
pCtrl
->
periodsFromStart
)
%
pCtrl
->
numOfPeriods
;
pList
=
&
(
pCtrl
->
tmrList
[
index
]);
if
(
state
==
TIMER_STATE_WAITING
)
{
if
(
removeFromWheel
(
timer
))
{
removeTimer
(
timer
->
id
);
// only safe to reuse the timer when timer is removed from the wheel.
// we cannot guarantee the thread safety of the timr in all other cases.
reusable
=
true
;
}
const
char
*
fmt
=
"timer[label=%s, id=%lld, fp=%p, param=%p] is cancelled."
;
tmrTrace
(
fmt
,
timer
->
ctrl
->
label
,
timer
->
id
,
timer
->
fp
,
timer
->
param
);
}
else
if
(
state
!=
TIMER_STATE_EXPIRED
)
{
// timer already stopped or cancelled, has nothing to do in this case
}
else
if
(
timer
->
executedBy
==
taosGetPthreadId
())
{
// taosTmrReset is called in the timer callback, should do nothing in this
// case to avoid dead lock. note taosTmrReset must be the last statement
// of the callback funtion, will be a bug otherwise.
}
else
{
assert
(
timer
->
executedBy
!=
taosGetPthreadId
());
pObj
->
index
=
index
;
cNode
=
pList
->
head
;
pNode
=
NULL
;
const
char
*
fmt
=
"timer[label=%s, id=%lld, fp=%p, param=%p] fired, waiting..."
;
tmrTrace
(
fmt
,
timer
->
ctrl
->
label
,
timer
->
id
,
timer
->
fp
,
timer
->
param
);
while
(
cNode
!=
NULL
)
{
if
(
cNode
->
cycle
<
pObj
->
cycle
)
{
pNode
=
cNode
;
cNode
=
cNode
->
next
;
}
else
{
break
;
for
(
int
i
=
1
;
atomic_load_8
(
&
timer
->
state
)
!=
TIMER_STATE_STOPPED
;
i
++
)
{
if
(
i
%
1000
==
0
)
{
sched_yield
();
}
}
}
pObj
->
next
=
cNode
;
pObj
->
prev
=
pNode
;
if
(
cNode
!=
NULL
)
{
cNode
->
prev
=
pObj
;
fmt
=
"timer[label=%s, id=%lld, fp=%p, param=%p] stopped."
;
tmrTrace
(
fmt
,
timer
->
ctrl
->
label
,
timer
->
id
,
timer
->
fp
,
timer
->
param
)
;
}
if
(
pNode
!=
NULL
)
{
pNode
->
next
=
pObj
;
}
else
{
pList
->
head
=
pObj
;
}
return
reusable
;
}
pList
->
count
++
;
pCtrl
->
numOfTmrs
++
;
bool
taosTmrStop
(
tmr_h
timerId
)
{
uintptr_t
id
=
(
uintptr_t
)
timerId
;
if
(
pthread_mutex_unlock
(
&
pCtrl
->
mutex
)
!=
0
)
tmrError
(
"%s mutex unlock failed, reason:%s"
,
pCtrl
->
label
,
strerror
(
errno
));
tmr_obj_t
*
timer
=
findTimer
(
id
);
if
(
timer
==
NULL
)
{
tmrTrace
(
"timer[id=%lld] does not exist"
,
id
);
return
false
;
}
tmrTrace
(
"%s %p, timer is reset, fp:%p, tmr_h:%p, index:%d, total:%d numOfFree:%d"
,
pCtrl
->
label
,
param1
,
fp
,
pObj
,
index
,
pCtrl
->
numOfTmrs
,
((
pool_t
*
)
pCtrl
->
poolHandle
)
->
numOfFree
);
uint8_t
state
=
__sync_val_compare_and_swap_8
(
&
timer
->
state
,
TIMER_STATE_WAITING
,
TIMER_STATE_CANCELED
);
doStopTimer
(
timer
,
state
);
timerDecRef
(
timer
);
return
;
return
state
==
TIMER_STATE_WAITING
;
}
void
taosTmrList
(
void
*
handle
)
{
int
i
;
tmr_list_t
*
pList
;
tmr_obj_t
*
pObj
;
tmr_ctrl_t
*
pCtrl
=
(
tmr_ctrl_t
*
)
handle
;
for
(
i
=
0
;
i
<
pCtrl
->
numOfPeriods
;
++
i
)
{
pList
=
&
(
pCtrl
->
tmrList
[
i
]);
pObj
=
pList
->
head
;
if
(
!
pObj
)
continue
;
printf
(
"
\n
index=%d count:%d
\n
"
,
i
,
pList
->
count
);
while
(
pObj
)
{
pObj
=
pObj
->
next
;
}
}
bool
taosTmrStopA
(
tmr_h
*
timerId
)
{
bool
ret
=
taosTmrStop
(
*
timerId
);
*
timerId
=
NULL
;
return
ret
;
}
mpool_h
tmrMemPoolInit
(
int
numOfBlock
,
int
blockSize
)
{
int
i
;
pool_t
*
pool_p
;
if
(
numOfBlock
<=
1
||
blockSize
<=
1
)
{
tmrError
(
"invalid parameter in memPoolInit
\n
"
);
bool
taosTmrReset
(
TAOS_TMR_CALLBACK
fp
,
int
mseconds
,
void
*
param
,
void
*
handle
,
tmr_h
*
pTmrId
)
{
tmr_ctrl_t
*
ctrl
=
(
tmr_ctrl_t
*
)
handle
;
if
(
ctrl
==
NULL
||
ctrl
->
label
[
0
]
==
0
)
{
return
NULL
;
}
pool_p
=
(
pool_t
*
)
malloc
(
sizeof
(
pool_t
));
if
(
pool_p
==
NULL
)
{
tmrError
(
"mempool malloc failed
\n
"
);
return
NULL
;
uintptr_t
id
=
(
uintptr_t
)
*
pTmrId
;
bool
stopped
=
false
;
tmr_obj_t
*
timer
=
findTimer
(
id
);
if
(
timer
==
NULL
)
{
tmrTrace
(
"timer[id=%lld] does not exist"
,
id
);
}
else
{
memset
(
pool_p
,
0
,
sizeof
(
pool_t
));
uint8_t
state
=
__sync_val_compare_and_swap_8
(
&
timer
->
state
,
TIMER_STATE_WAITING
,
TIMER_STATE_CANCELED
);
if
(
!
doStopTimer
(
timer
,
state
))
{
timerDecRef
(
timer
);
timer
=
NULL
;
}
stopped
=
state
==
TIMER_STATE_WAITING
;
}
pool_p
->
blockSize
=
blockSize
;
pool_p
->
numOfBlock
=
numOfBlock
;
pool_p
->
pool
=
(
char
*
)
malloc
(
blockSize
*
numOfBlock
);
pool_p
->
freeList
=
(
int
*
)
malloc
(
sizeof
(
int
)
*
numOfBlock
);
if
(
pool_p
->
pool
==
NULL
||
pool_p
->
freeList
==
NULL
)
{
tmrError
(
"failed to allocate memory
\n
"
);
tfree
(
pool_p
->
freeList
);
tfree
(
pool_p
->
pool
);
free
(
pool_p
);
return
NULL
;
if
(
timer
==
NULL
)
{
*
pTmrId
=
taosTmrStart
(
fp
,
mseconds
,
param
,
handle
);
return
stopped
;
}
memset
(
pool_p
->
pool
,
0
,
blockSize
*
numOfBlock
);
for
(
i
=
0
;
i
<
pool_p
->
numOfBlock
;
++
i
)
pool_p
->
freeList
[
i
]
=
i
;
tmrTrace
(
"timer[id=%lld] is reused"
,
timer
->
id
);
// wait until there's no other reference to this timer,
// so that we can reuse this timer safely.
for
(
int
i
=
1
;
atomic_load_8
(
&
timer
->
refCount
)
>
1
;
++
i
)
{
if
(
i
%
1000
==
0
)
{
sched_yield
();
}
}
pool_p
->
first
=
0
;
pool_p
->
numOfFree
=
pool_p
->
numOfBlock
;
assert
(
timer
->
refCount
==
1
);
memset
(
timer
,
0
,
sizeof
(
*
timer
));
*
pTmrId
=
(
tmr_h
)
doStartTimer
(
timer
,
fp
,
mseconds
,
param
,
ctrl
);
return
(
mpool_h
)
pool_p
;
return
stopped
;
}
char
*
tmrMemPoolMalloc
(
mpool_h
handle
)
{
char
*
pos
=
NULL
;
pool_t
*
pool_p
=
(
pool_t
*
)
handle
;
static
void
taosTmrModuleInit
(
void
)
{
for
(
int
i
=
0
;
i
<
tListLen
(
tmrCtrls
)
-
1
;
++
i
)
{
tmr_ctrl_t
*
ctrl
=
tmrCtrls
+
i
;
ctrl
->
next
=
ctrl
+
1
;
}
unusedTmrCtrl
=
tmrCtrls
;
pthread_mutex_init
(
&
tmrCtrlMutex
,
NULL
);
if
(
pool_p
->
numOfFree
<=
0
||
pool_p
->
numOfFree
>
pool_p
->
numOfBlock
)
{
tmrError
(
"mempool: out of memory, numOfFree:%d, numOfBlock:%d"
,
pool_p
->
numOfFree
,
pool_p
->
numOfBlock
);
}
else
{
pos
=
pool_p
->
pool
+
pool_p
->
blockSize
*
(
pool_p
->
freeList
[
pool_p
->
first
]);
pool_p
->
first
++
;
pool_p
->
first
=
pool_p
->
first
%
pool_p
->
numOfBlock
;
pool_p
->
numOfFree
--
;
int64_t
now
=
taosGetTimestampMs
();
for
(
int
i
=
0
;
i
<
tListLen
(
wheels
);
i
++
)
{
time_wheel_t
*
wheel
=
wheels
+
i
;
if
(
pthread_mutex_init
(
&
wheel
->
mutex
,
NULL
)
!=
0
)
{
tmrError
(
"failed to create the mutex for wheel, reason:%s"
,
strerror
(
errno
));
return
;
}
wheel
->
nextScanAt
=
now
+
wheel
->
resolution
;
wheel
->
index
=
0
;
wheel
->
slots
=
(
tmr_obj_t
**
)
calloc
(
wheel
->
size
,
sizeof
(
tmr_obj_t
*
));
if
(
wheel
->
slots
==
NULL
)
{
tmrError
(
"failed to allocate wheel slots"
);
return
;
}
timerMap
.
size
+=
wheel
->
size
;
}
return
pos
;
}
timerMap
.
count
=
0
;
timerMap
.
slots
=
(
timer_list_t
*
)
calloc
(
timerMap
.
size
,
sizeof
(
timer_list_t
));
if
(
timerMap
.
slots
==
NULL
)
{
tmrError
(
"failed to allocate hash map"
);
return
;
}
void
tmrMemPoolFree
(
mpool_h
handle
,
char
*
pMem
)
{
int
index
;
pool_t
*
pool_p
=
(
pool_t
*
)
handle
;
tmrQhandle
=
taosInitScheduler
(
10000
,
taosTmrThreads
,
"tmr"
);
taosInitTimer
(
taosTimerLoopFunc
,
MSECONDS_PER_TICK
);
if
(
pMem
==
NULL
)
return
;
tmrTrace
(
"timer module is initialized, number of threads: %d"
,
taosTmrThreads
);
}
index
=
(
int
)(
pMem
-
pool_p
->
pool
)
/
pool_p
->
blockSize
;
void
*
taosTmrInit
(
int
maxNumOfTmrs
,
int
resolution
,
int
longest
,
const
char
*
label
)
{
pthread_once
(
&
tmrModuleInit
,
taosTmrModuleInit
);
if
(
index
<
0
||
index
>=
pool_p
->
numOfBlock
)
{
tmrError
(
"tmr mempool: error, invalid address:%p
\n
"
,
pMem
);
}
else
{
memset
(
pMem
,
0
,
pool_p
->
blockSize
);
pool_p
->
freeList
[(
pool_p
->
first
+
pool_p
->
numOfFree
)
%
pool_p
->
numOfBlock
]
=
index
;
pool_p
->
numOfFree
++
;
pthread_mutex_lock
(
&
tmrCtrlMutex
);
tmr_ctrl_t
*
ctrl
=
unusedTmrCtrl
;
if
(
ctrl
!=
NULL
)
{
unusedTmrCtrl
=
ctrl
->
next
;
}
pthread_mutex_unlock
(
&
tmrCtrlMutex
);
if
(
ctrl
==
NULL
)
{
tmrError
(
"too many timer controllers, failed to create timer controller[label=%s]."
,
label
);
return
NULL
;
}
strncpy
(
ctrl
->
label
,
label
,
sizeof
(
ctrl
->
label
));
ctrl
->
label
[
sizeof
(
ctrl
->
label
)
-
1
]
=
0
;
tmrTrace
(
"timer controller[label=%s] is initialized."
,
label
);
return
ctrl
;
}
void
tmrMemPoolCleanUp
(
mpool_h
handle
)
{
pool_t
*
pool_p
=
(
pool_t
*
)
handle
;
if
(
pool_p
==
NULL
)
return
;
void
taosTmrCleanUp
(
void
*
handle
)
{
tmr_ctrl_t
*
ctrl
=
(
tmr_ctrl_t
*
)
handle
;
assert
(
ctrl
!=
NULL
&&
ctrl
->
label
[
0
]
!=
0
);
tmrTrace
(
"timer controller[label=%s] is cleaned up."
,
ctrl
->
label
);
ctrl
->
label
[
0
]
=
0
;
if
(
pool_p
->
pool
)
free
(
pool_p
->
pool
);
if
(
pool_p
->
freeList
)
free
(
pool_p
->
freeList
)
;
memset
(
&
pool_p
,
0
,
sizeof
(
pool_p
))
;
free
(
pool_p
);
pthread_mutex_lock
(
&
tmrCtrlMutex
);
ctrl
->
next
=
unusedTmrCtrl
;
unusedTmrCtrl
=
ctrl
;
pthread_mutex_unlock
(
&
tmrCtrlMutex
);
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录