diff --git a/src/os/darwin/inc/os.h b/src/os/darwin/inc/os.h index 1444da5dded7c1c7fd561074edfd779b2b2c19d3..83a56483b8463155a91c3e15cf5ff0db39d8d600 100644 --- a/src/os/darwin/inc/os.h +++ b/src/os/darwin/inc/os.h @@ -84,8 +84,8 @@ #define __sync_add_and_fetch_64 __sync_add_and_fetch #define __sync_add_and_fetch_32 __sync_add_and_fetch #define __sync_add_and_fetch_16 __sync_add_and_fetch -#define __sync_add_and_fetch_ptr __sync_add_and_fetch #define __sync_add_and_fetch_8 __sync_add_and_fetch +#define __sync_add_and_fetch_ptr __sync_add_and_fetch #define __sync_sub_and_fetch_64 __sync_sub_and_fetch #define __sync_sub_and_fetch_32 __sync_sub_and_fetch diff --git a/src/os/linux/inc/os.h b/src/os/linux/inc/os.h index 39b10023a338a7227378dfaab056b288915fd93b..78065e6df0590755d098563316fbe214c695658d 100644 --- a/src/os/linux/inc/os.h +++ b/src/os/linux/inc/os.h @@ -100,8 +100,8 @@ extern "C" { #define __sync_add_and_fetch_64 __sync_add_and_fetch #define __sync_add_and_fetch_32 __sync_add_and_fetch #define __sync_add_and_fetch_16 __sync_add_and_fetch -#define __sync_add_and_fetch_ptr __sync_add_and_fetch #define __sync_add_and_fetch_8 __sync_add_and_fetch +#define __sync_add_and_fetch_ptr __sync_add_and_fetch #define __sync_sub_and_fetch_64 __sync_sub_and_fetch #define __sync_sub_and_fetch_32 __sync_sub_and_fetch diff --git a/src/os/windows/inc/os.h b/src/os/windows/inc/os.h index d308fff9731af37b3a7cb26f3dc6d93ec18b4156..3f61ff12b53fdca1c60d827978109dcbdffc6318 100644 --- a/src/os/windows/inc/os.h +++ b/src/os/windows/inc/os.h @@ -127,16 +127,19 @@ extern "C" { #define __sync_val_compare_and_swap_64(ptr, oldval, newval) _InterlockedCompareExchange64((__int64 volatile*)(ptr), (__int64)(newval), (__int64)(oldval)) #define __sync_val_compare_and_swap_ptr(ptr, oldval, newval) _InterlockedCompareExchangePointer((void* volatile*)(ptr), (void*)(newval), (void*)(oldval)) -char interlocked_add_8(char volatile* ptr, char val); -short interlocked_add_16(short volatile* ptr, short val); +char interlocked_add_8(char volatile *ptr, char val); +short interlocked_add_16(short volatile *ptr, short val); +long interlocked_add_32(long volatile *ptr, long val); +__int64 interlocked_add_64(__int64 volatile *ptr, __int64 val); + #define __sync_add_and_fetch_8(ptr, val) interlocked_add_8((char volatile*)(ptr), (char)(val)) #define __sync_add_and_fetch_16(ptr, val) interlocked_add_16((short volatile*)(ptr), (short)(val)) -#define __sync_add_and_fetch_32(ptr, val) _InterlockedAdd((long volatile*)(ptr), (long)(val)) -#define __sync_add_and_fetch_64(ptr, val) _InterlockedAdd64((__int64 volatile*)(ptr), (__int64)(val)) +#define __sync_add_and_fetch_32(ptr, val) interlocked_add_32((long volatile*)(ptr), (long)(val)) +#define __sync_add_and_fetch_64(ptr, val) interlocked_add_64((__int64 volatile*)(ptr), (__int64)(val)) #ifdef _WIN64 - #define __sync_add_and_fetch_ptr atomic_add_fetch_64 + #define __sync_add_and_fetch_ptr __sync_add_and_fetch_64 #else - #define __sync_add_and_fetch_ptr atomic_add_fetch_32 + #define __sync_add_and_fetch_ptr __sync_add_and_fetch_32 #endif #define __sync_sub_and_fetch_8(ptr, val) __sync_add_and_fetch_8((ptr), -(val)) diff --git a/src/os/windows/src/twindows.c b/src/os/windows/src/twindows.c index 8cc36ae4db5f6bfb4e5fce15b5f4681da16cd806..106cb903b170a4798af6ad715ecb62f9093ba550 100644 --- a/src/os/windows/src/twindows.c +++ b/src/os/windows/src/twindows.c @@ -43,8 +43,11 @@ void taosResetPthread(pthread_t *thread) { } int64_t taosGetPthreadId() { - pthread_t id = pthread_self(); - return (int64_t)id.p; +#ifdef PTW32_VERSION + return pthread_getw32threadid_np(pthread_self()); +#else + return (int64_t)pthread_self(); +#endif } int taosSetSockOpt(int socketfd, int level, int optname, void *optval, int optlen) { @@ -72,6 +75,14 @@ short interlocked_add_16(short volatile* ptr, short val) { return _InterlockedExchangeAdd16(ptr, val) + val; } +long interlocked_add_32(long volatile* ptr, long val) { + return _InterlockedExchangeAdd(ptr, val) + val; +} + +__int64 interlocked_add_64(__int64 volatile* ptr, __int64 val) { + return _InterlockedExchangeAdd64(ptr, val) + val; +} + int32_t __sync_val_load_32(int32_t *ptr) { return InterlockedOr(ptr, 0); } diff --git a/src/util/src/ttimer.c b/src/util/src/ttimer.c index 85fc4d459565618dd0b222939f4fa6338d0d0b26..798e63a64a51efa008968fd5070331c49620eaac 100644 --- a/src/util/src/ttimer.c +++ b/src/util/src/ttimer.c @@ -27,15 +27,6 @@ #include "tutil.h" -static uintptr_t pthreadGetId() { -#ifdef PTW32_VERSION - return pthread_getw32threadid_np(pthread_self()); -#else - assert(sizeof(pthread_t) == sizeof(uintptr_t)); - return (uintptr_t)pthread_self(); -#endif -} - #define TIMER_STATE_WAITING 0 #define TIMER_STATE_EXPIRED 1 #define TIMER_STATE_STOPPED 2 @@ -63,15 +54,15 @@ typedef struct tmr_obj_t { uint8_t reserved1; uint16_t reserved2; union { - int64_t expireAt; - uintptr_t executedBy; + int64_t expireAt; + int64_t executedBy; }; TAOS_TMR_CALLBACK fp; void* param; } tmr_obj_t; typedef struct timer_list_t { - uintptr_t lockedBy; + int64_t lockedBy; tmr_obj_t* timers; } timer_list_t; @@ -125,9 +116,9 @@ static void timerDecRef(tmr_obj_t* timer) { } static void lockTimerList(timer_list_t* list) { - uintptr_t tid = pthreadGetId(); + int64_t tid = taosGetPthreadId(); int i = 0; - while (__sync_val_compare_and_swap_ptr(&(list->lockedBy), 0, tid) != 0) { + while (__sync_val_compare_and_swap_64(&(list->lockedBy), 0, tid) != 0) { if (++i % 1000 == 0) { sched_yield(); } @@ -135,8 +126,8 @@ static void lockTimerList(timer_list_t* list) { } static void unlockTimerList(timer_list_t* list) { - uintptr_t tid = pthreadGetId(); - if (__sync_val_compare_and_swap_ptr(&(list->lockedBy), tid, 0) != tid) { + int64_t tid = taosGetPthreadId(); + if (__sync_val_compare_and_swap_64(&(list->lockedBy), tid, 0) != tid) { assert(false); tmrError("trying to unlock a timer list not locked by current thread."); } @@ -262,7 +253,7 @@ static bool removeFromWheel(tmr_obj_t* timer) { static void processExpiredTimer(void* handle, void* arg) { tmr_obj_t* timer = (tmr_obj_t*)handle; - timer->executedBy = pthreadGetId(); + timer->executedBy = taosGetPthreadId(); uint8_t state = __sync_val_compare_and_swap_8(&timer->state, TIMER_STATE_WAITING, TIMER_STATE_EXPIRED); if (state == TIMER_STATE_WAITING) { const char* fmt = "timer[label=%s, id=%lld, fp=%p, param=%p] execution start."; @@ -402,12 +393,12 @@ static bool doStopTimer(tmr_obj_t* timer, uint8_t state) { tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param); } else if (state != TIMER_STATE_EXPIRED) { // timer already stopped or cancelled, has nothing to do in this case - } else if (timer->executedBy == pthreadGetId()) { + } else if (timer->executedBy == taosGetPthreadId()) { // taosTmrReset is called in the timer callback, should do nothing in this // case to avoid dead lock. note taosTmrReset must be the last statement // of the callback funtion, will be a bug otherwise. } else { - assert(timer->executedBy != pthreadGetId()); + assert(timer->executedBy != taosGetPthreadId()); const char* fmt = "timer[label=%s, id=%lld, fp=%p, param=%p] fired, waiting..."; tmrTrace(fmt, timer->ctrl->label, timer->id, timer->fp, timer->param);