diff --git a/src/inc/ttimer.h b/src/inc/ttimer.h index 8a7e81dac73873e25cead301db4f594651fc7c6f..ddfd3f1fdf2a6d5e6a46ad16a047ed209ebcce8b 100644 --- a/src/inc/ttimer.h +++ b/src/inc/ttimer.h @@ -21,40 +21,41 @@ extern "C" { #endif typedef void *tmr_h; +typedef void (*TAOS_TMR_CALLBACK)(void *, void *); extern uint32_t tmrDebugFlag; -extern int taosTmrThreads; +extern int taosTmrThreads; #define tmrError(...) \ - if (tmrDebugFlag & DEBUG_ERROR) { \ + do { if (tmrDebugFlag & DEBUG_ERROR) { \ tprintf("ERROR TMR ", tmrDebugFlag, __VA_ARGS__); \ - } + } } while(0) + #define tmrWarn(...) \ - if (tmrDebugFlag & DEBUG_WARN) { \ + do { if (tmrDebugFlag & DEBUG_WARN) { \ tprintf("WARN TMR ", tmrDebugFlag, __VA_ARGS__); \ - } + } } while(0) + #define tmrTrace(...) \ - if (tmrDebugFlag & DEBUG_TRACE) { \ + do { if (tmrDebugFlag & DEBUG_TRACE) { \ tprintf("TMR ", tmrDebugFlag, __VA_ARGS__); \ - } + } } while(0) -#define MAX_NUM_OF_TMRCTL 512 +#define MAX_NUM_OF_TMRCTL 32 #define MSECONDS_PER_TICK 5 -void *taosTmrInit(int maxTmr, int resoultion, int longest, char *label); +void *taosTmrInit(int maxTmr, int resoultion, int longest, const char *label); -tmr_h taosTmrStart(void (*fp)(void *, void *), int mseconds, void *param1, void *handle); +tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int mseconds, void *param, void *handle); -void taosTmrStop(tmr_h tmrId); +bool taosTmrStop(tmr_h tmrId); -void taosTmrStopA(tmr_h *timerId); +bool taosTmrStopA(tmr_h *timerId); -void taosTmrReset(void (*fp)(void *, void *), int mseconds, void *param1, void *handle, tmr_h *pTmrId); +bool taosTmrReset(TAOS_TMR_CALLBACK fp, int mseconds, void *param, void *handle, tmr_h *pTmrId); void taosTmrCleanUp(void *handle); -void taosTmrList(void *handle); - #ifdef __cplusplus } #endif diff --git a/src/os/darwin/inc/os.h b/src/os/darwin/inc/os.h index c9f461d9a0d93e74710f13496e94638cd417bbbb..1444da5dded7c1c7fd561074edfd779b2b2c19d3 100644 --- a/src/os/darwin/inc/os.h +++ b/src/os/darwin/inc/os.h @@ -55,10 +55,44 @@ #define taosWriteSocket(fd, buf, len) write(fd, buf, len) #define taosReadSocket(fd, buf, len) read(fd, buf, len) +#define atomic_load_8(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST) +#define atomic_load_16(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST) +#define atomic_load_32(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST) +#define atomic_load_64(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST) +#define atomic_load_ptr(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST) + +#define atomic_store_8(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_store_16(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_store_32(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_store_64(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_store_ptr(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST) + +#define atomic_exchange_8(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_exchange_16(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_exchange_32(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_exchange_64(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_exchange_ptr(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST) + +// TODO: update prefix of below macros to 'atomic' as '__' is reserved by compiler +// and GCC suggest new code to use '__atomic' builtins to replace '__sync' builtins. #define __sync_val_compare_and_swap_64 __sync_val_compare_and_swap #define __sync_val_compare_and_swap_32 __sync_val_compare_and_swap +#define __sync_val_compare_and_swap_16 __sync_val_compare_and_swap +#define __sync_val_compare_and_swap_8 __sync_val_compare_and_swap +#define __sync_val_compare_and_swap_ptr __sync_val_compare_and_swap + #define __sync_add_and_fetch_64 __sync_add_and_fetch #define __sync_add_and_fetch_32 __sync_add_and_fetch +#define __sync_add_and_fetch_16 __sync_add_and_fetch +#define __sync_add_and_fetch_ptr __sync_add_and_fetch +#define __sync_add_and_fetch_8 __sync_add_and_fetch + +#define __sync_sub_and_fetch_64 __sync_sub_and_fetch +#define __sync_sub_and_fetch_32 __sync_sub_and_fetch +#define __sync_sub_and_fetch_16 __sync_sub_and_fetch +#define __sync_sub_and_fetch_8 __sync_sub_and_fetch +#define __sync_sub_and_fetch_ptr __sync_sub_and_fetch + int32_t __sync_val_load_32(int32_t *ptr); void __sync_val_restore_32(int32_t *ptr, int32_t newval); diff --git a/src/os/linux/inc/os.h b/src/os/linux/inc/os.h index 004896960eb18881e4dab1b8dbbb512744619790..39b10023a338a7227378dfaab056b288915fd93b 100644 --- a/src/os/linux/inc/os.h +++ b/src/os/linux/inc/os.h @@ -71,14 +71,43 @@ extern "C" { #define taosWriteSocket(fd, buf, len) write(fd, buf, len) #define taosReadSocket(fd, buf, len) read(fd, buf, len) +#define atomic_load_8(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST) +#define atomic_load_16(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST) +#define atomic_load_32(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST) +#define atomic_load_64(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST) +#define atomic_load_ptr(ptr) __atomic_load_n((ptr), __ATOMIC_SEQ_CST) + +#define atomic_store_8(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_store_16(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_store_32(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_store_64(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_store_ptr(ptr, val) __atomic_store_n((ptr), (val), __ATOMIC_SEQ_CST) + +#define atomic_exchange_8(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_exchange_16(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_exchange_32(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_exchange_64(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST) +#define atomic_exchange_ptr(ptr, val) __atomic_exchange_n((ptr), (val), __ATOMIC_SEQ_CST) + +// TODO: update prefix of below macros to 'atomic' as '__' is reserved by compiler +// and GCC suggest new code to use '__atomic' builtins to replace '__sync' builtins. #define __sync_val_compare_and_swap_64 __sync_val_compare_and_swap #define __sync_val_compare_and_swap_32 __sync_val_compare_and_swap +#define __sync_val_compare_and_swap_16 __sync_val_compare_and_swap +#define __sync_val_compare_and_swap_8 __sync_val_compare_and_swap +#define __sync_val_compare_and_swap_ptr __sync_val_compare_and_swap #define __sync_add_and_fetch_64 __sync_add_and_fetch #define __sync_add_and_fetch_32 __sync_add_and_fetch +#define __sync_add_and_fetch_16 __sync_add_and_fetch +#define __sync_add_and_fetch_ptr __sync_add_and_fetch +#define __sync_add_and_fetch_8 __sync_add_and_fetch #define __sync_sub_and_fetch_64 __sync_sub_and_fetch #define __sync_sub_and_fetch_32 __sync_sub_and_fetch +#define __sync_sub_and_fetch_16 __sync_sub_and_fetch +#define __sync_sub_and_fetch_8 __sync_sub_and_fetch +#define __sync_sub_and_fetch_ptr __sync_sub_and_fetch int32_t __sync_val_load_32(int32_t *ptr); void __sync_val_restore_32(int32_t *ptr, int32_t newval); diff --git a/src/os/windows/inc/os.h b/src/os/windows/inc/os.h index 1a840ed1ef0c21263ce5f42c0baa6efd7ef7f025..d308fff9731af37b3a7cb26f3dc6d93ec18b4156 100644 --- a/src/os/windows/inc/os.h +++ b/src/os/windows/inc/os.h @@ -29,6 +29,7 @@ #include #include #include +#include #ifdef __cplusplus extern "C" { @@ -78,12 +79,72 @@ extern "C" { #define taosWriteSocket(fd, buf, len) send(fd, buf, len, 0) #define taosReadSocket(fd, buf, len) recv(fd, buf, len, 0) -int32_t __sync_val_compare_and_swap_32(int32_t *ptr, int32_t oldval, int32_t newval); -int32_t __sync_add_and_fetch_32(int32_t *ptr, int32_t val); -int32_t __sync_sub_and_fetch_32(int32_t *ptr, int32_t val); -int64_t __sync_val_compare_and_swap_64(int64_t *ptr, int64_t oldval, int64_t newval); -int64_t __sync_add_and_fetch_64(int64_t *ptr, int64_t val); -int64_t __sync_sub_and_fetch_64(int64_t *ptr, int64_t val); +#if defined(_M_ARM) || defined(_M_ARM64) + +#define atomic_load_8(ptr) __iso_volatile_load8((const volatile __int8*)(ptr)) +#define atomic_load_16(ptr) __iso_volatile_load16((const volatile __int16*)(ptr)) +#define atomic_load_32(ptr) __iso_volatile_load32((const volatile __int32*)(ptr)) +#define atomic_load_64(ptr) __iso_volatile_load64((const volatile __int64*)(ptr)) + +#define atomic_store_8(ptr, val) __iso_volatile_store8((volatile __int8*)(ptr), (__int8)(val)) +#define atomic_store_16(ptr, val) __iso_volatile_store16((volatile __int16*)(ptr), (__int16)(val)) +#define atomic_store_32(ptr, val) __iso_volatile_store32((volatile __int32*)(ptr), (__int32)(val)) +#define atomic_store_64(ptr, val) __iso_volatile_store64((volatile __int64*)(ptr), (__int64)(val)) + +#ifdef _M_ARM64 +#define atomic_load_ptr atomic_load_64 +#define atomic_store_ptr atomic_store_64 +#else +#define atomic_load_ptr atomic_load_32 +#define atomic_store_ptr atomic_store_32 +#endif + +#else + +#define atomic_load_8(ptr) (*(char volatile*)(ptr)) +#define atomic_load_16(ptr) (*(short volatile*)(ptr)) +#define atomic_load_32(ptr) (*(long volatile*)(ptr)) +#define atomic_load_64(ptr) (*(__int64 volatile*)(ptr)) +#define atomic_load_ptr(ptr) (*(void* volatile*)(ptr)) + +#define atomic_store_8(ptr, val) ((*(char volatile*)(ptr)) = (char)(val)) +#define atomic_store_16(ptr, val) ((*(short volatile*)(ptr)) = (short)(val)) +#define atomic_store_32(ptr, val) ((*(long volatile*)(ptr)) = (long)(val)) +#define atomic_store_64(ptr, val) ((*(__int64 volatile*)(ptr)) = (__int64)(val)) +#define atomic_store_ptr(ptr, val) ((*(void* volatile*)(ptr)) = (void*)(val)) + +#endif + +#define atomic_exchange_8(ptr, val) _InterlockedExchange8((char volatile*)(ptr), (char)(val)) +#define atomic_exchange_16(ptr, val) _InterlockedExchange16((short volatile*)(ptr), (short)(val)) +#define atomic_exchange_32(ptr, val) _InterlockedExchange((long volatile*)(ptr), (long)(val)) +#define atomic_exchange_64(ptr, val) _InterlockedExchange64((__int64 volatile*)(ptr), (__int64)(val)) +#define atomic_exchange_ptr(ptr, val) _InterlockedExchangePointer((void* volatile*)(ptr), (void*)(val)) + +#define __sync_val_compare_and_swap_8(ptr, oldval, newval) _InterlockedCompareExchange8((char volatile*)(ptr), (char)(newval), (char)(oldval)) +#define __sync_val_compare_and_swap_16(ptr, oldval, newval) _InterlockedCompareExchange16((short volatile*)(ptr), (short)(newval), (short)(oldval)) +#define __sync_val_compare_and_swap_32(ptr, oldval, newval) _InterlockedCompareExchange((long volatile*)(ptr), (long)(newval), (long)(oldval)) +#define __sync_val_compare_and_swap_64(ptr, oldval, newval) _InterlockedCompareExchange64((__int64 volatile*)(ptr), (__int64)(newval), (__int64)(oldval)) +#define __sync_val_compare_and_swap_ptr(ptr, oldval, newval) _InterlockedCompareExchangePointer((void* volatile*)(ptr), (void*)(newval), (void*)(oldval)) + +char interlocked_add_8(char volatile* ptr, char val); +short interlocked_add_16(short volatile* ptr, short val); +#define __sync_add_and_fetch_8(ptr, val) interlocked_add_8((char volatile*)(ptr), (char)(val)) +#define __sync_add_and_fetch_16(ptr, val) interlocked_add_16((short volatile*)(ptr), (short)(val)) +#define __sync_add_and_fetch_32(ptr, val) _InterlockedAdd((long volatile*)(ptr), (long)(val)) +#define __sync_add_and_fetch_64(ptr, val) _InterlockedAdd64((__int64 volatile*)(ptr), (__int64)(val)) +#ifdef _WIN64 + #define __sync_add_and_fetch_ptr atomic_add_fetch_64 +#else + #define __sync_add_and_fetch_ptr atomic_add_fetch_32 +#endif + +#define __sync_sub_and_fetch_8(ptr, val) __sync_add_and_fetch_8((ptr), -(val)) +#define __sync_sub_and_fetch_16(ptr, val) __sync_add_and_fetch_16((ptr), -(val)) +#define __sync_sub_and_fetch_32(ptr, val) __sync_add_and_fetch_32((ptr), -(val)) +#define __sync_sub_and_fetch_64(ptr, val) __sync_add_and_fetch_64((ptr), -(val)) +#define __sync_sub_and_fetch_ptr(ptr, val) __sync_add_and_fetch_ptr((ptr), -(val)) + int32_t __sync_val_load_32(int32_t *ptr); void __sync_val_restore_32(int32_t *ptr, int32_t newval); diff --git a/src/os/windows/src/twindows.c b/src/os/windows/src/twindows.c index b1c3112bdbf6f899e1e14ddfba5070636bc58cc0..8cc36ae4db5f6bfb4e5fce15b5f4681da16cd806 100644 --- a/src/os/windows/src/twindows.c +++ b/src/os/windows/src/twindows.c @@ -63,28 +63,13 @@ int taosSetSockOpt(int socketfd, int level, int optname, void *optval, int optle return setsockopt(socketfd, level, optname, optval, optlen); } -int32_t __sync_val_compare_and_swap_32(int32_t *ptr, int32_t oldval, int32_t newval) { - return InterlockedCompareExchange(ptr, newval, oldval); -} - -int32_t __sync_add_and_fetch_32(int32_t *ptr, int32_t val) { - return InterlockedAdd(ptr, val); -} - -int32_t __sync_sub_and_fetch_32(int32_t *ptr, int32_t val) { - return InterlockedAdd(ptr, -val); -} - -int64_t __sync_val_compare_and_swap_64(int64_t *ptr, int64_t oldval, int64_t newval) { - return InterlockedCompareExchange64(ptr, newval, oldval); -} -int64_t __sync_add_and_fetch_64(int64_t *ptr, int64_t val) { - return InterlockedAdd64(ptr, val); +char interlocked_add_8(char volatile* ptr, char val) { + return _InterlockedExchangeAdd8(ptr, val) + val; } -int64_t __sync_sub_and_fetch_64(int64_t *ptr, int64_t val) { - return InterlockedAdd64(ptr, -val); +short interlocked_add_16(short volatile* ptr, short val) { + return _InterlockedExchangeAdd16(ptr, val) + val; } int32_t __sync_val_load_32(int32_t *ptr) { diff --git a/src/util/src/ttimer.c b/src/util/src/ttimer.c index 5c726390e736a0abc5189796e0173c592757413d..85fc4d459565618dd0b222939f4fa6338d0d0b26 100644 --- a/src/util/src/ttimer.c +++ b/src/util/src/ttimer.c @@ -16,555 +16,548 @@ #include #include #include -#include -#include +#include #include #include -#include - #include "os.h" -#include "tidpool.h" #include "tlog.h" #include "tsched.h" +#include "ttime.h" #include "ttimer.h" #include "tutil.h" -// special mempool without mutex -#define mpool_h void * - -typedef struct { - int numOfFree; /* number of free slots */ - int first; /* the first free slot */ - int numOfBlock; /* the number of blocks */ - int blockSize; /* block size in bytes */ - int * freeList; /* the index list */ - char *pool; /* the actual mem block */ -} pool_t; - -mpool_h tmrMemPoolInit(int maxNum, int blockSize); -char *tmrMemPoolMalloc(mpool_h handle); -void tmrMemPoolFree(mpool_h handle, char *p); -void tmrMemPoolCleanUp(mpool_h handle); - -typedef struct _tmr_obj { - void *param1; - void (*fp)(void *, void *); - tmr_h timerId; - short cycle; - struct _tmr_obj * prev; - struct _tmr_obj * next; - int index; - struct _tmr_ctrl_t *pCtrl; -} tmr_obj_t; - -typedef struct { - tmr_obj_t *head; - int count; -} tmr_list_t; - -typedef struct _tmr_ctrl_t { - void * signature; - pthread_mutex_t mutex; /* mutex to protect critical resource */ - int resolution; /* resolution in mseconds */ - int numOfPeriods; /* total number of periods */ - int64_t periodsFromStart; /* count number of periods since start */ - pthread_t thread; /* timer thread ID */ - tmr_list_t * tmrList; - mpool_h poolHandle; - char label[12]; - int maxNumOfTmrs; - int numOfTmrs; - int ticks; - int maxTicks; - int tmrCtrlId; -} tmr_ctrl_t; -uint32_t tmrDebugFlag = DEBUG_ERROR | DEBUG_WARN | DEBUG_FILE; -void taosTmrProcessList(tmr_ctrl_t *); - -tmr_ctrl_t tmrCtrl[MAX_NUM_OF_TMRCTL]; -int numOfTmrCtrl = 0; -void * tmrIdPool = NULL; -void * tmrQhandle; -int taosTmrThreads = 1; - -void taosTimerLoopFunc(int signo) { - tmr_ctrl_t *pCtrl; - int count = 0; - - for (int i = 1; i < MAX_NUM_OF_TMRCTL; ++i) { - pCtrl = tmrCtrl + i; - if (pCtrl->signature) { - count++; - pCtrl->ticks++; - if (pCtrl->ticks >= pCtrl->maxTicks) { - taosTmrProcessList(pCtrl); - pCtrl->ticks = 0; - } - if (count >= numOfTmrCtrl) break; - } - } +static uintptr_t pthreadGetId() { +#ifdef PTW32_VERSION + return pthread_getw32threadid_np(pthread_self()); +#else + assert(sizeof(pthread_t) == sizeof(uintptr_t)); + return (uintptr_t)pthread_self(); +#endif } -void taosTmrModuleInit(void) { - tmrIdPool = taosInitIdPool(MAX_NUM_OF_TMRCTL); - memset(tmrCtrl, 0, sizeof(tmrCtrl)); +#define TIMER_STATE_WAITING 0 +#define TIMER_STATE_EXPIRED 1 +#define TIMER_STATE_STOPPED 2 +#define TIMER_STATE_CANCELED 3 + +typedef union _tmr_ctrl_t { + char label[16]; + struct { + // pad to ensure 'next' is the end of this union + char padding[16 - sizeof(union _tmr_ctrl_t*)]; + union _tmr_ctrl_t* next; + }; +} tmr_ctrl_t; - taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK); +typedef struct tmr_obj_t { + uintptr_t id; + tmr_ctrl_t* ctrl; + struct tmr_obj_t* mnext; + struct tmr_obj_t* prev; + struct tmr_obj_t* next; + uint16_t slot; + uint8_t wheel; + uint8_t state; + uint8_t refCount; + uint8_t reserved1; + uint16_t reserved2; + union { + int64_t expireAt; + uintptr_t executedBy; + }; + TAOS_TMR_CALLBACK fp; + void* param; +} tmr_obj_t; - tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr"); - tmrTrace("timer module is initialized, thread:%d", taosTmrThreads); +typedef struct timer_list_t { + uintptr_t lockedBy; + tmr_obj_t* timers; +} timer_list_t; + +typedef struct timer_map_t { + uint32_t size; + uint32_t count; + timer_list_t* slots; +} timer_map_t; + +typedef struct time_wheel_t { + pthread_mutex_t mutex; + int64_t nextScanAt; + uint32_t resolution; + uint16_t size; + uint16_t index; + tmr_obj_t** slots; +} time_wheel_t; + +uint32_t tmrDebugFlag = DEBUG_ERROR | DEBUG_WARN | DEBUG_FILE; + +static pthread_once_t tmrModuleInit = PTHREAD_ONCE_INIT; +static pthread_mutex_t tmrCtrlMutex; +static tmr_ctrl_t tmrCtrls[MAX_NUM_OF_TMRCTL]; +static tmr_ctrl_t* unusedTmrCtrl = NULL; +void* tmrQhandle; +int taosTmrThreads = 1; + +static uintptr_t nextTimerId = 0; + +static time_wheel_t wheels[] = { + {.resolution = MSECONDS_PER_TICK, .size = 4096}, + {.resolution = 1000, .size = 1024}, + {.resolution = 60000, .size = 1024}, +}; +static timer_map_t timerMap; + +static uintptr_t getNextTimerId() { + uintptr_t id; + do { + id = __sync_add_and_fetch_ptr(&nextTimerId, 1); + } while (id == 0); + return id; } -void *taosTmrInit(int maxNumOfTmrs, int resolution, int longest, char *label) { - static pthread_once_t tmrInit = PTHREAD_ONCE_INIT; - tmr_ctrl_t * pCtrl; - - pthread_once(&tmrInit, taosTmrModuleInit); +static void timerAddRef(tmr_obj_t* timer) { __sync_add_and_fetch_8(&timer->refCount, 1); } - int tmrCtrlId = taosAllocateId(tmrIdPool); - - if (tmrCtrlId < 0) { - tmrError("%s bug!!! too many timers!!!", label); - return NULL; - } - - pCtrl = tmrCtrl + tmrCtrlId; - tfree(pCtrl->tmrList); - tmrMemPoolCleanUp(pCtrl->poolHandle); - - memset(pCtrl, 0, sizeof(tmr_ctrl_t)); - - pCtrl->tmrCtrlId = tmrCtrlId; - strcpy(pCtrl->label, label); - pCtrl->maxNumOfTmrs = maxNumOfTmrs; - - if ((pCtrl->poolHandle = tmrMemPoolInit(maxNumOfTmrs + 10, sizeof(tmr_obj_t))) == NULL) { - tmrError("%s failed to allocate mem pool", label); - tmrMemPoolCleanUp(pCtrl->poolHandle); - return NULL; +static void timerDecRef(tmr_obj_t* timer) { + if (__sync_sub_and_fetch_8(&timer->refCount, 1) == 0) { + free(timer); } +} - if (resolution < MSECONDS_PER_TICK) resolution = MSECONDS_PER_TICK; - pCtrl->resolution = resolution; - pCtrl->maxTicks = resolution / MSECONDS_PER_TICK; - pCtrl->ticks = rand() / pCtrl->maxTicks; - pCtrl->numOfPeriods = longest / resolution; - if (pCtrl->numOfPeriods < 10) pCtrl->numOfPeriods = 10; - - pCtrl->tmrList = (tmr_list_t *)malloc(sizeof(tmr_list_t) * pCtrl->numOfPeriods); - if (pCtrl->tmrList == NULL) { - tmrError("%s failed to allocate(size:%d) mem for tmrList", label, sizeof(tmr_list_t) * pCtrl->numOfPeriods); - tmrMemPoolCleanUp(pCtrl->poolHandle); - taosTmrCleanUp(pCtrl); - return NULL; - } - - for (int i = 0; i < pCtrl->numOfPeriods; i++) { - pCtrl->tmrList[i].head = NULL; - pCtrl->tmrList[i].count = 0; +static void lockTimerList(timer_list_t* list) { + uintptr_t tid = pthreadGetId(); + int i = 0; + while (__sync_val_compare_and_swap_ptr(&(list->lockedBy), 0, tid) != 0) { + if (++i % 1000 == 0) { + sched_yield(); + } } +} - if (pthread_mutex_init(&pCtrl->mutex, NULL) < 0) { - tmrError("%s failed to create the mutex, reason:%s", label, strerror(errno)); - taosTmrCleanUp(pCtrl); - return NULL; +static void unlockTimerList(timer_list_t* list) { + uintptr_t tid = pthreadGetId(); + if (__sync_val_compare_and_swap_ptr(&(list->lockedBy), tid, 0) != tid) { + assert(false); + tmrError("trying to unlock a timer list not locked by current thread."); } - - pCtrl->signature = pCtrl; - numOfTmrCtrl++; - tmrTrace("%s timer ctrl is initialized, index:%d", label, tmrCtrlId); - return pCtrl; } -void taosTmrProcessList(tmr_ctrl_t *pCtrl) { - unsigned int index; - tmr_list_t * pList; - tmr_obj_t * pObj, *header; +static void addTimer(tmr_obj_t* timer) { + timerAddRef(timer); + timer->wheel = tListLen(wheels); - pthread_mutex_lock(&pCtrl->mutex); - index = pCtrl->periodsFromStart % pCtrl->numOfPeriods; - pList = &pCtrl->tmrList[index]; + uint32_t idx = (uint32_t)(timer->id % timerMap.size); + timer_list_t* list = timerMap.slots + idx; - while (1) { - header = pList->head; - if (header == NULL) break; + lockTimerList(list); + timer->mnext = list->timers; + list->timers = timer; + unlockTimerList(list); +} - if (header->cycle > 0) { - pObj = header; - while (pObj) { - pObj->cycle--; - pObj = pObj->next; +static tmr_obj_t* findTimer(uintptr_t id) { + tmr_obj_t* timer = NULL; + if (id > 0) { + uint32_t idx = (uint32_t)(id % timerMap.size); + timer_list_t* list = timerMap.slots + idx; + lockTimerList(list); + for (timer = list->timers; timer != NULL; timer = timer->mnext) { + if (timer->id == id) { + timerAddRef(timer); + break; } - break; } - - pCtrl->numOfTmrs--; - tmrTrace("%s %p, timer expired, fp:%p, tmr_h:%p, index:%d, total:%d", pCtrl->label, header->param1, header->fp, - header, index, pCtrl->numOfTmrs); - - pList->head = header->next; - if (header->next) header->next->prev = NULL; - pList->count--; - header->timerId = NULL; - - SSchedMsg schedMsg; - schedMsg.fp = NULL; - schedMsg.tfp = header->fp; - schedMsg.ahandle = header->param1; - schedMsg.thandle = header; - taosScheduleTask(tmrQhandle, &schedMsg); - - tmrMemPoolFree(pCtrl->poolHandle, (char *)header); + unlockTimerList(list); } - - pCtrl->periodsFromStart++; - pthread_mutex_unlock(&pCtrl->mutex); -} - -void taosTmrCleanUp(void *handle) { - tmr_ctrl_t *pCtrl = (tmr_ctrl_t *)handle; - if (pCtrl == NULL || pCtrl->signature != pCtrl) return; - - pCtrl->signature = NULL; - taosFreeId(tmrIdPool, pCtrl->tmrCtrlId); - numOfTmrCtrl--; - tmrTrace("%s is cleaned up, numOfTmrs:%d", pCtrl->label, numOfTmrCtrl); + return timer; } -tmr_h taosTmrStart(void (*fp)(void *, void *), int mseconds, void *param1, void *handle) { - tmr_obj_t * pObj, *cNode, *pNode; - tmr_list_t *pList; - int index, period; - tmr_ctrl_t *pCtrl = (tmr_ctrl_t *)handle; - - if (handle == NULL) return NULL; - - period = mseconds / pCtrl->resolution; - if (pthread_mutex_lock(&pCtrl->mutex) != 0) - tmrError("%s mutex lock failed, reason:%s", pCtrl->label, strerror(errno)); - - pObj = (tmr_obj_t *)tmrMemPoolMalloc(pCtrl->poolHandle); - if (pObj == NULL) { - tmrError("%s reach max number of timers:%d", pCtrl->label, pCtrl->maxNumOfTmrs); - pthread_mutex_unlock(&pCtrl->mutex); - return NULL; +static void removeTimer(uintptr_t id) { + tmr_obj_t* prev = NULL; + uint32_t idx = (uint32_t)(id % timerMap.size); + timer_list_t* list = timerMap.slots + idx; + lockTimerList(list); + for (tmr_obj_t* p = list->timers; p != NULL; p = p->mnext) { + if (p->id == id) { + if (prev == NULL) { + list->timers = p->mnext; + } else { + prev->mnext = p->mnext; + } + timerDecRef(p); + break; + } + prev = p; } + unlockTimerList(list); +} - pObj->cycle = period / pCtrl->numOfPeriods; - pObj->param1 = param1; - pObj->fp = fp; - pObj->timerId = pObj; - pObj->pCtrl = pCtrl; - - index = (period + pCtrl->periodsFromStart) % pCtrl->numOfPeriods; - int cindex = (pCtrl->periodsFromStart) % pCtrl->numOfPeriods; - pList = &(pCtrl->tmrList[index]); - - pObj->index = index; - cNode = pList->head; - pNode = NULL; - - while (cNode != NULL) { - if (cNode->cycle < pObj->cycle) { - pNode = cNode; - cNode = cNode->next; - } else { +static void addToWheel(tmr_obj_t* timer, uint32_t delay) { + timerAddRef(timer); + // select a wheel for the timer, we are not an accurate timer, + // but the inaccuracy should not be too large. + timer->wheel = tListLen(wheels) - 1; + for (uint8_t i = 0; i < tListLen(wheels); i++) { + time_wheel_t* wheel = wheels + i; + if (delay < wheel->resolution * wheel->size) { + timer->wheel = i; break; } } - pObj->next = cNode; - pObj->prev = pNode; + time_wheel_t* wheel = wheels + timer->wheel; + timer->prev = NULL; + timer->expireAt = taosGetTimestampMs() + delay; - if (cNode != NULL) { - cNode->prev = pObj; - } + pthread_mutex_lock(&wheel->mutex); - if (pNode != NULL) { - pNode->next = pObj; - } else { - pList->head = pObj; + uint32_t idx = 0; + if (timer->expireAt > wheel->nextScanAt) { + // adjust delay according to next scan time of this wheel + // so that the timer is not fired earlier than desired. + delay = (uint32_t)(timer->expireAt - wheel->nextScanAt); + idx = (delay + wheel->resolution - 1) / wheel->resolution; } - pList->count++; - pCtrl->numOfTmrs++; - - if (pthread_mutex_unlock(&pCtrl->mutex) != 0) - tmrError("%s mutex unlock failed, reason:%s", pCtrl->label, strerror(errno)); - - tmrTrace("%s %p, timer started, fp:%p, tmr_h:%p, index:%d, total:%d cindex:%d", pCtrl->label, param1, fp, pObj, index, - pCtrl->numOfTmrs, cindex); + timer->slot = (uint16_t)((wheel->index + idx + 1) % wheel->size); + tmr_obj_t* p = wheel->slots[timer->slot]; + wheel->slots[timer->slot] = timer; + timer->next = p; + if (p != NULL) { + p->prev = timer; + } - return (tmr_h)pObj; + pthread_mutex_unlock(&wheel->mutex); } -void taosTmrStop(tmr_h timerId) { - tmr_obj_t * pObj; - tmr_list_t *pList; - tmr_ctrl_t *pCtrl; - - pObj = (tmr_obj_t *)timerId; - if (pObj == NULL) return; - - pCtrl = pObj->pCtrl; - if (pCtrl == NULL) return; - - if (pthread_mutex_lock(&pCtrl->mutex) != 0) - tmrError("%s mutex lock failed, reason:%s", pCtrl->label, strerror(errno)); - - if (pObj->timerId == timerId) { - pList = &(pCtrl->tmrList[pObj->index]); - if (pObj->prev) { - pObj->prev->next = pObj->next; - } else { - pList->head = pObj->next; +static bool removeFromWheel(tmr_obj_t* timer) { + if (timer->wheel >= tListLen(wheels)) { + return false; + } + time_wheel_t* wheel = wheels + timer->wheel; + + bool removed = false; + pthread_mutex_lock(&wheel->mutex); + // other thread may modify timer->wheel, check again. + if (timer->wheel < tListLen(wheels)) { + if (timer->prev != NULL) { + timer->prev->next = timer->next; } - - if (pObj->next) { - pObj->next->prev = pObj->prev; + if (timer->next != NULL) { + timer->next->prev = timer->prev; } - - pList->count--; - pObj->timerId = NULL; - pCtrl->numOfTmrs--; - - tmrTrace("%s %p, timer stopped, fp:%p, tmr_h:%p, total:%d", pCtrl->label, pObj->param1, pObj->fp, pObj, - pCtrl->numOfTmrs); - tmrMemPoolFree(pCtrl->poolHandle, (char *)(pObj)); + if (timer == wheel->slots[timer->slot]) { + wheel->slots[timer->slot] = timer->next; + } + timer->wheel = tListLen(wheels); + timer->next = NULL; + timer->prev = NULL; + timerDecRef(timer); + removed = true; } + pthread_mutex_unlock(&wheel->mutex); - pthread_mutex_unlock(&pCtrl->mutex); + return removed; } -void taosTmrStopA(tmr_h *timerId) { - tmr_obj_t * pObj; - tmr_list_t *pList; - tmr_ctrl_t *pCtrl; - - pObj = *(tmr_obj_t **)timerId; - if (pObj == NULL) return; +static void processExpiredTimer(void* handle, void* arg) { + tmr_obj_t* timer = (tmr_obj_t*)handle; + timer->executedBy = pthreadGetId(); + uint8_t state = __sync_val_compare_and_swap_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_EXPIRED); + if (state == TIMER_STATE_WAITING) { + const char* fmt = "timer[label=%s, id=%lld, fp=%p, param=%p] execution start."; + tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param); - pCtrl = pObj->pCtrl; - if (pCtrl == NULL) return; + (*timer->fp)(timer->param, (tmr_h)timer->id); + atomic_store_8(&timer->state, TIMER_STATE_STOPPED); - if (pthread_mutex_lock(&pCtrl->mutex) != 0) - tmrError("%s mutex lock failed, reason:%s", pCtrl->label, strerror(errno)); - - if (pObj->timerId == pObj) { - pList = &(pCtrl->tmrList[pObj->index]); - if (pObj->prev) { - pObj->prev->next = pObj->next; - } else { - pList->head = pObj->next; - } + fmt = "timer[label=%s, id=%lld, fp=%p, param=%p] execution end."; + tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param); + } + removeTimer(timer->id); + timerDecRef(timer); +} - if (pObj->next) { - pObj->next->prev = pObj->prev; - } +static void addToExpired(tmr_obj_t* head) { + const char* fmt = "timer[label=%s, id=%lld, fp=%p, param=%p] expired"; - pList->count--; - pObj->timerId = NULL; - pCtrl->numOfTmrs--; + while (head != NULL) { + tmrTrace(fmt, head->ctrl->label, head->id, head->fp, head->param); - tmrTrace("%s %p, timer stopped atomiclly, fp:%p, tmr_h:%p, total:%d", pCtrl->label, pObj->param1, pObj->fp, pObj, - pCtrl->numOfTmrs); - tmrMemPoolFree(pCtrl->poolHandle, (char *)(pObj)); + tmr_obj_t* next = head->next; + SSchedMsg schedMsg; + schedMsg.fp = NULL; + schedMsg.tfp = processExpiredTimer; + schedMsg.ahandle = head; + schedMsg.thandle = NULL; + taosScheduleTask(tmrQhandle, &schedMsg); + head = next; + } +} - *(tmr_obj_t **)timerId = NULL; +static uintptr_t doStartTimer(tmr_obj_t* timer, TAOS_TMR_CALLBACK fp, int mseconds, void* param, tmr_ctrl_t* ctrl) { + uintptr_t id = getNextTimerId(); + timer->id = id; + timer->state = TIMER_STATE_WAITING; + timer->fp = fp; + timer->param = param; + timer->ctrl = ctrl; + addTimer(timer); + + const char* fmt = "timer[label=%s, id=%lld, fp=%p, param=%p] started"; + tmrTrace(fmt, ctrl->label, timer->id, timer->fp, timer->param); + + if (mseconds == 0) { + timer->wheel = tListLen(wheels); + timerAddRef(timer); + addToExpired(timer); } else { - tmrTrace("%s %p, timer stopped atomiclly, fp:%p, tmr_h:%p, total:%d", pCtrl->label, pObj->param1, pObj->fp, pObj, - pCtrl->numOfTmrs); + addToWheel(timer, mseconds); } - pthread_mutex_unlock(&pCtrl->mutex); + // note: use `timer->id` here is unsafe as `timer` may already be freed + return id; } -void taosTmrReset(void (*fp)(void *, void *), int mseconds, void *param1, void *handle, tmr_h *pTmrId) { - tmr_obj_t * pObj, *cNode, *pNode; - tmr_list_t *pList; - int index, period; - tmr_ctrl_t *pCtrl = (tmr_ctrl_t *)handle; - - if (handle == NULL) return; - if (pTmrId == NULL) return; - - period = mseconds / pCtrl->resolution; - if (pthread_mutex_lock(&pCtrl->mutex) != 0) - tmrError("%s mutex lock failed, reason:%s", pCtrl->label, strerror(errno)); +tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int mseconds, void* param, void* handle) { + tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle; + if (ctrl == NULL || ctrl->label[0] == 0) { + return NULL; + } - pObj = (tmr_obj_t *)(*pTmrId); + tmr_obj_t* timer = (tmr_obj_t*)calloc(1, sizeof(tmr_obj_t)); + if (timer == NULL) { + tmrError("failed to allocated memory for new timer object."); + return NULL; + } - if (pObj && pObj->timerId == *pTmrId) { - // exist, stop it first - pList = &(pCtrl->tmrList[pObj->index]); - if (pObj->prev) { - pObj->prev->next = pObj->next; - } else { - pList->head = pObj->next; - } + return (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl); +} - if (pObj->next) { - pObj->next->prev = pObj->prev; +static void taosTimerLoopFunc(int signo) { + int64_t now = taosGetTimestampMs(); + + for (int i = 0; i < tListLen(wheels); i++) { + // `expried` is a temporary expire list. + // expired timers are first add to this list, then move + // to expired queue as a batch to improve performance. + // note this list is used as a stack in this function. + tmr_obj_t* expired = NULL; + + time_wheel_t* wheel = wheels + i; + while (now >= wheel->nextScanAt) { + pthread_mutex_lock(&wheel->mutex); + wheel->index = (wheel->index + 1) % wheel->size; + tmr_obj_t* timer = wheel->slots[wheel->index]; + while (timer != NULL) { + tmr_obj_t* next = timer->next; + if (now < timer->expireAt) { + timer = next; + continue; + } + + // remove from the wheel + if (timer->prev == NULL) { + wheel->slots[wheel->index] = next; + if (next != NULL) { + next->prev = NULL; + } + } else { + timer->prev->next = next; + if (next != NULL) { + next->prev = timer->prev; + } + } + timer->wheel = tListLen(wheels); + + // add to temporary expire list + timer->next = expired; + timer->prev = NULL; + if (expired != NULL) { + expired->prev = timer; + } + expired = timer; + + timer = next; + } + pthread_mutex_unlock(&wheel->mutex); + wheel->nextScanAt += wheel->resolution; } - pList->count--; - pObj->timerId = NULL; - pCtrl->numOfTmrs--; - } else { - // timer not there, or already expired - pObj = (tmr_obj_t *)tmrMemPoolMalloc(pCtrl->poolHandle); - *pTmrId = pObj; - - if (pObj == NULL) { - tmrError("%s failed to allocate timer, max:%d allocated:%d", pCtrl->label, pCtrl->maxNumOfTmrs, pCtrl->numOfTmrs); - pthread_mutex_unlock(&pCtrl->mutex); - return; - } + addToExpired(expired); } +} - pObj->cycle = period / pCtrl->numOfPeriods; - pObj->param1 = param1; - pObj->fp = fp; - pObj->timerId = pObj; - pObj->pCtrl = pCtrl; +static bool doStopTimer(tmr_obj_t* timer, uint8_t state) { + bool reusable = false; - index = (period + pCtrl->periodsFromStart) % pCtrl->numOfPeriods; - pList = &(pCtrl->tmrList[index]); + if (state == TIMER_STATE_WAITING) { + if (removeFromWheel(timer)) { + removeTimer(timer->id); + // only safe to reuse the timer when timer is removed from the wheel. + // we cannot guarantee the thread safety of the timr in all other cases. + reusable = true; + } + const char* fmt = "timer[label=%s, id=%lld, fp=%p, param=%p] is cancelled."; + tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param); + } else if (state != TIMER_STATE_EXPIRED) { + // timer already stopped or cancelled, has nothing to do in this case + } else if (timer->executedBy == pthreadGetId()) { + // taosTmrReset is called in the timer callback, should do nothing in this + // case to avoid dead lock. note taosTmrReset must be the last statement + // of the callback funtion, will be a bug otherwise. + } else { + assert(timer->executedBy != pthreadGetId()); - pObj->index = index; - cNode = pList->head; - pNode = NULL; + const char* fmt = "timer[label=%s, id=%lld, fp=%p, param=%p] fired, waiting..."; + tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param); - while (cNode != NULL) { - if (cNode->cycle < pObj->cycle) { - pNode = cNode; - cNode = cNode->next; - } else { - break; + for (int i = 1; atomic_load_8(&timer->state) != TIMER_STATE_STOPPED; i++) { + if (i % 1000 == 0) { + sched_yield(); + } } - } - pObj->next = cNode; - pObj->prev = pNode; - - if (cNode != NULL) { - cNode->prev = pObj; + fmt = "timer[label=%s, id=%lld, fp=%p, param=%p] stopped."; + tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param); } - if (pNode != NULL) { - pNode->next = pObj; - } else { - pList->head = pObj; - } + return reusable; +} - pList->count++; - pCtrl->numOfTmrs++; +bool taosTmrStop(tmr_h timerId) { + uintptr_t id = (uintptr_t)timerId; - if (pthread_mutex_unlock(&pCtrl->mutex) != 0) - tmrError("%s mutex unlock failed, reason:%s", pCtrl->label, strerror(errno)); + tmr_obj_t* timer = findTimer(id); + if (timer == NULL) { + tmrTrace("timer[id=%lld] does not exist", id); + return false; + } - tmrTrace("%s %p, timer is reset, fp:%p, tmr_h:%p, index:%d, total:%d numOfFree:%d", pCtrl->label, param1, fp, pObj, - index, pCtrl->numOfTmrs, ((pool_t *)pCtrl->poolHandle)->numOfFree); + uint8_t state = __sync_val_compare_and_swap_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED); + doStopTimer(timer, state); + timerDecRef(timer); - return; + return state == TIMER_STATE_WAITING; } -void taosTmrList(void *handle) { - int i; - tmr_list_t *pList; - tmr_obj_t * pObj; - tmr_ctrl_t *pCtrl = (tmr_ctrl_t *)handle; - - for (i = 0; i < pCtrl->numOfPeriods; ++i) { - pList = &(pCtrl->tmrList[i]); - pObj = pList->head; - if (!pObj) continue; - printf("\nindex=%d count:%d\n", i, pList->count); - while (pObj) { - pObj = pObj->next; - } - } +bool taosTmrStopA(tmr_h* timerId) { + bool ret = taosTmrStop(*timerId); + *timerId = NULL; + return ret; } -mpool_h tmrMemPoolInit(int numOfBlock, int blockSize) { - int i; - pool_t *pool_p; - - if (numOfBlock <= 1 || blockSize <= 1) { - tmrError("invalid parameter in memPoolInit\n"); +bool taosTmrReset(TAOS_TMR_CALLBACK fp, int mseconds, void* param, void* handle, tmr_h* pTmrId) { + tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle; + if (ctrl == NULL || ctrl->label[0] == 0) { return NULL; } - pool_p = (pool_t *)malloc(sizeof(pool_t)); - if (pool_p == NULL) { - tmrError("mempool malloc failed\n"); - return NULL; + uintptr_t id = (uintptr_t)*pTmrId; + bool stopped = false; + tmr_obj_t* timer = findTimer(id); + if (timer == NULL) { + tmrTrace("timer[id=%lld] does not exist", id); } else { - memset(pool_p, 0, sizeof(pool_t)); + uint8_t state = __sync_val_compare_and_swap_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_CANCELED); + if (!doStopTimer(timer, state)) { + timerDecRef(timer); + timer = NULL; + } + stopped = state == TIMER_STATE_WAITING; } - pool_p->blockSize = blockSize; - pool_p->numOfBlock = numOfBlock; - pool_p->pool = (char *)malloc(blockSize * numOfBlock); - pool_p->freeList = (int *)malloc(sizeof(int) * numOfBlock); - - if (pool_p->pool == NULL || pool_p->freeList == NULL) { - tmrError("failed to allocate memory\n"); - tfree(pool_p->freeList); - tfree(pool_p->pool); - free(pool_p); - return NULL; + if (timer == NULL) { + *pTmrId = taosTmrStart(fp, mseconds, param, handle); + return stopped; } - memset(pool_p->pool, 0, blockSize * numOfBlock); - for (i = 0; i < pool_p->numOfBlock; ++i) pool_p->freeList[i] = i; + tmrTrace("timer[id=%lld] is reused", timer->id); + + // wait until there's no other reference to this timer, + // so that we can reuse this timer safely. + for (int i = 1; atomic_load_8(&timer->refCount) > 1; ++i) { + if (i % 1000 == 0) { + sched_yield(); + } + } - pool_p->first = 0; - pool_p->numOfFree = pool_p->numOfBlock; + assert(timer->refCount == 1); + memset(timer, 0, sizeof(*timer)); + *pTmrId = (tmr_h)doStartTimer(timer, fp, mseconds, param, ctrl); - return (mpool_h)pool_p; + return stopped; } -char *tmrMemPoolMalloc(mpool_h handle) { - char * pos = NULL; - pool_t *pool_p = (pool_t *)handle; +static void taosTmrModuleInit(void) { + for (int i = 0; i < tListLen(tmrCtrls) - 1; ++i) { + tmr_ctrl_t* ctrl = tmrCtrls + i; + ctrl->next = ctrl + 1; + } + unusedTmrCtrl = tmrCtrls; - if (pool_p->numOfFree <= 0 || pool_p->numOfFree > pool_p->numOfBlock) { - tmrError("mempool: out of memory, numOfFree:%d, numOfBlock:%d", pool_p->numOfFree, pool_p->numOfBlock); - } else { - pos = pool_p->pool + pool_p->blockSize * (pool_p->freeList[pool_p->first]); - pool_p->first++; - pool_p->first = pool_p->first % pool_p->numOfBlock; - pool_p->numOfFree--; + pthread_mutex_init(&tmrCtrlMutex, NULL); + + int64_t now = taosGetTimestampMs(); + for (int i = 0; i < tListLen(wheels); i++) { + time_wheel_t* wheel = wheels + i; + if (pthread_mutex_init(&wheel->mutex, NULL) != 0) { + tmrError("failed to create the mutex for wheel, reason:%s", strerror(errno)); + return; + } + wheel->nextScanAt = now + wheel->resolution; + wheel->index = 0; + wheel->slots = (tmr_obj_t**)calloc(wheel->size, sizeof(tmr_obj_t*)); + if (wheel->slots == NULL) { + tmrError("failed to allocate wheel slots"); + return; + } + timerMap.size += wheel->size; } - return pos; -} + timerMap.count = 0; + timerMap.slots = (timer_list_t*)calloc(timerMap.size, sizeof(timer_list_t)); + if (timerMap.slots == NULL) { + tmrError("failed to allocate hash map"); + return; + } -void tmrMemPoolFree(mpool_h handle, char *pMem) { - int index; - pool_t *pool_p = (pool_t *)handle; + tmrQhandle = taosInitScheduler(10000, taosTmrThreads, "tmr"); + taosInitTimer(taosTimerLoopFunc, MSECONDS_PER_TICK); - if (pMem == NULL) return; + tmrTrace("timer module is initialized, number of threads: %d", taosTmrThreads); +} - index = (int)(pMem - pool_p->pool) / pool_p->blockSize; +void* taosTmrInit(int maxNumOfTmrs, int resolution, int longest, const char* label) { + pthread_once(&tmrModuleInit, taosTmrModuleInit); - if (index < 0 || index >= pool_p->numOfBlock) { - tmrError("tmr mempool: error, invalid address:%p\n", pMem); - } else { - memset(pMem, 0, pool_p->blockSize); - pool_p->freeList[(pool_p->first + pool_p->numOfFree) % pool_p->numOfBlock] = index; - pool_p->numOfFree++; + pthread_mutex_lock(&tmrCtrlMutex); + tmr_ctrl_t* ctrl = unusedTmrCtrl; + if (ctrl != NULL) { + unusedTmrCtrl = ctrl->next; } + pthread_mutex_unlock(&tmrCtrlMutex); + + if (ctrl == NULL) { + tmrError("too many timer controllers, failed to create timer controller[label=%s].", label); + return NULL; + } + + strncpy(ctrl->label, label, sizeof(ctrl->label)); + ctrl->label[sizeof(ctrl->label) - 1] = 0; + tmrTrace("timer controller[label=%s] is initialized.", label); + return ctrl; } -void tmrMemPoolCleanUp(mpool_h handle) { - pool_t *pool_p = (pool_t *)handle; - if (pool_p == NULL) return; +void taosTmrCleanUp(void* handle) { + tmr_ctrl_t* ctrl = (tmr_ctrl_t*)handle; + assert(ctrl != NULL && ctrl->label[0] != 0); + + tmrTrace("timer controller[label=%s] is cleaned up.", ctrl->label); + ctrl->label[0] = 0; - if (pool_p->pool) free(pool_p->pool); - if (pool_p->freeList) free(pool_p->freeList); - memset(&pool_p, 0, sizeof(pool_p)); - free(pool_p); + pthread_mutex_lock(&tmrCtrlMutex); + ctrl->next = unusedTmrCtrl; + unusedTmrCtrl = ctrl; + pthread_mutex_unlock(&tmrCtrlMutex); }