diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 2a85068a95aa1944bfab9ad8f7f856775bd689de..9f8a0622ab299c4ccef55306f390106097d1284d 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -29,6 +29,7 @@ #include "ttimer.h" #include "tutil.h" +#define HASH_MAX_CAPACITY (1024*1024*16) #define HASH_VALUE_IN_TRASH (-1) #define HASH_DEFAULT_LOAD_FACTOR (0.75) #define HASH_INDEX(v, c) ((v) & ((c)-1)) @@ -81,24 +82,68 @@ typedef struct { * * when the node in pTrash does not be referenced, it will be release at the expired time */ - SDataNode * pTrash; - int numOfElemsInTrash; // number of element in trash - void * tmrCtrl; - void * pTimer; - SCacheStatis statistics; - _hashFunc hashFp; - -#if defined LINUX + SDataNode * pTrash; + void * tmrCtrl; + void * pTimer; + SCacheStatis statistics; + _hashFunc hashFp; + int numOfElemsInTrash; // number of element in trash + int16_t deleting; // set the deleting flag to stop refreshing asap. + int16_t refreshing; // if refreshing is invoked, it will be set 1 + +#if defined LINUX pthread_rwlock_t lock; #else - pthread_mutex_t mutex; + pthread_mutex_t lock; #endif } SCacheObj; -static FORCE_INLINE int32_t taosNormalHashTableLength(int32_t length) { +static FORCE_INLINE void __cache_wr_lock(SCacheObj *pObj) { +#if defined LINUX + pthread_rwlock_wrlock(&pObj->lock); +#else + pthread_mutex_lock(&pObj->lock); +#endif +} + +static FORCE_INLINE void __cache_rd_lock(SCacheObj *pObj) { +#if defined LINUX + pthread_rwlock_rdlock(&pObj->lock); +#else + pthread_mutex_lock(&pObj->lock); +#endif +} + +static FORCE_INLINE void __cache_unlock(SCacheObj *pObj) { +#if defined LINUX + pthread_rwlock_unlock(&pObj->lock); +#else + pthread_mutex_unlock(&pObj->lock); +#endif +} + +static FORCE_INLINE int32_t __cache_lock_init(SCacheObj *pObj) { +#if defined LINUX + return pthread_rwlock_init(&pObj->lock, NULL); +#else + return pthread_mutex_init(&pObj->lock, NULL); +#endif +} + +static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pObj) { +#if defined LINUX + pthread_rwlock_destroy(&pObj->lock); +#else + pthread_mutex_destroy(&pObj->lock); +#endif +} + +static FORCE_INLINE int32_t taosHashTableLength(int32_t length) { + int32_t trueLength = MIN(length, HASH_MAX_CAPACITY); + int32_t i = 4; - while (i < length) i = (i << 1); + while (i < trueLength) i = (i << 1); return i; } @@ -197,22 +242,15 @@ static void taosRemoveFromTrash(SCacheObj *pObj, SDataNode *pNode) { * may cause corruption. So, forece model only applys before cache is closed */ static void taosClearCacheTrash(SCacheObj *pObj, bool force) { -#if defined LINUX - pthread_rwlock_wrlock(&pObj->lock); -#else - pthread_mutex_lock(&pObj->mutex); -#endif + __cache_wr_lock(pObj); if (pObj->numOfElemsInTrash == 0) { if (pObj->pTrash != NULL) { pError("key:inconsistency data in cache, numOfElem in trash:%d", pObj->numOfElemsInTrash); } pObj->pTrash = NULL; -#if defined LINUX - pthread_rwlock_unlock(&pObj->lock); -#else - pthread_mutex_unlock(&pObj->mutex); -#endif + + __cache_unlock(pObj); return; } @@ -239,11 +277,7 @@ static void taosClearCacheTrash(SCacheObj *pObj, bool force) { } assert(pObj->numOfElemsInTrash >= 0); -#if defined LINUX - pthread_rwlock_unlock(&pObj->lock); -#else - pthread_mutex_unlock(&pObj->mutex); -#endif + __cache_unlock(pObj); } /** @@ -323,7 +357,7 @@ static void taosUpdateInHashTable(SCacheObj *pObj, SDataNode *pNode) { * @param keyLen key length * @return */ -static SDataNode *taosGetNodeFromHashTable(SCacheObj *pObj, char *key, uint32_t keyLen) { +static SDataNode *taosGetNodeFromHashTable(SCacheObj *pObj, const char *key, uint32_t keyLen) { uint32_t hash = (*pObj->hashFp)(key, keyLen); int32_t slot = HASH_INDEX(hash, pObj->capacity); @@ -358,8 +392,13 @@ static void taosHashTableResize(SCacheObj *pObj) { SDataNode *pNext = NULL; int32_t newSize = pObj->capacity << 1; + if (newSize > HASH_MAX_CAPACITY) { + pTrace("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached", + pObj->capacity, HASH_MAX_CAPACITY); + return; + } - int64_t st = taosGetTimestampUs(); + int64_t st = taosGetTimestampUs(); SDataNode **pList = realloc(pObj->hashList, sizeof(SDataNode *) * newSize); if (pList == NULL) { pTrace("cache resize failed due to out of memory, capacity remain:%d", pObj->capacity); @@ -367,6 +406,10 @@ static void taosHashTableResize(SCacheObj *pObj) { } pObj->hashList = pList; + + int32_t inc = newSize - pObj->capacity; + memset(&pObj->hashList[pObj->capacity], 0, inc * sizeof(SDataNode *)); + pObj->capacity = newSize; for (int32_t i = 0; i < pObj->capacity; ++i) { @@ -457,7 +500,7 @@ static SDataNode *taosUpdateCacheImpl(SCacheObj *pObj, SDataNode *pNode, char *k // only a node is not referenced by any other object, in-place update it if (pNode->refCount == 0) { - size_t newSize = sizeof(SDataNode) + dataSize + keyLen; + size_t newSize = sizeof(SDataNode) + dataSize + (keyLen + 1); pNewNode = (SDataNode *)realloc(pNode, newSize); if (pNewNode == NULL) { @@ -542,11 +585,7 @@ void *taosAddDataIntoCache(void *handle, char *key, char *pData, int dataSize, i uint32_t keyLen = (uint32_t)strlen(key) + 1; -#if defined LINUX - pthread_rwlock_wrlock(&pObj->lock); -#else - pthread_mutex_lock(&pObj->mutex); -#endif + __cache_wr_lock(pObj); SDataNode *pOldNode = taosGetNodeFromHashTable(pObj, key, keyLen - 1); @@ -565,11 +604,7 @@ void *taosAddDataIntoCache(void *handle, char *key, char *pData, int dataSize, i pTrace("key:%s %p exist in cache, updated", key, pNode); } -#if defined LINUX - pthread_rwlock_unlock(&pObj->lock); -#else - pthread_mutex_unlock(&pObj->mutex); -#endif + __cache_unlock(pObj); return (pNode != NULL) ? pNode->data : NULL; } @@ -612,17 +647,13 @@ void taosRemoveDataFromCache(void *handle, void **data, bool _remove) { *data = NULL; if (_remove) { -#if defined LINUX - pthread_rwlock_wrlock(&pObj->lock); -#else - pthread_mutex_lock(&pObj->mutex); -#endif + __cache_wr_lock(pObj); // pNode may be released immediately by other thread after the reference count of pNode is set to 0, // So we need to lock it in the first place. taosDecRef(pNode); taosCacheMoveNodeToTrash(pObj, pNode); - pthread_rwlock_unlock(&pObj->lock); + __cache_unlock(pObj); } else { taosDecRef(pNode); } @@ -640,22 +671,14 @@ void *taosGetDataFromCache(void *handle, char *key) { uint32_t keyLen = (uint32_t)strlen(key); -#if defined LINUX - pthread_rwlock_rdlock(&pObj->lock); -#else - pthread_mutex_lock(&pObj->mutex); -#endif + __cache_rd_lock(pObj); SDataNode *ptNode = taosGetNodeFromHashTable(handle, key, keyLen); if (ptNode != NULL) { __sync_add_and_fetch_32(&ptNode->refCount, 1); } -#if defined LINUX - pthread_rwlock_unlock(&pObj->lock); -#else - pthread_mutex_unlock(&pObj->mutex); -#endif + __cache_unlock(pObj); if (ptNode != NULL) { __sync_add_and_fetch_32(&pObj->statistics.hitCount, 1); @@ -685,11 +708,7 @@ void *taosUpdateDataFromCache(void *handle, char *key, char *pData, int size, in uint32_t keyLen = strlen(key) + 1; -#if defined LINUX - pthread_rwlock_wrlock(&pObj->lock); -#else - pthread_mutex_lock(&pObj->mutex); -#endif + __cache_wr_lock(pObj); SDataNode *pNode = taosGetNodeFromHashTable(handle, key, keyLen - 1); @@ -702,25 +721,33 @@ void *taosUpdateDataFromCache(void *handle, char *key, char *pData, int size, in pTrace("key:%s updated.expireTime:%lld.refCnt:%d", key, pNode->time, pNode->refCount); } -#if defined LINUX - pthread_rwlock_unlock(&pObj->lock); -#else - pthread_mutex_unlock(&pObj->mutex); -#endif - + __cache_unlock(pObj); return (pNew != NULL) ? pNew->data : NULL; } /** - * refresh cache to remove data in both hashlist and trash, if any nodes' refcount == 0, every pObj->refreshTime + * refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pObj->refreshTime * @param handle Cache object handle */ void taosRefreshDataCache(void *handle, void *tmrId) { SDataNode *pNode, *pNext; SCacheObj *pObj = (SCacheObj *)handle; - if (pObj == NULL || (pObj->size == 0 && pObj->numOfElemsInTrash == 0)) { - taosTmrReset(taosRefreshDataCache, pObj->refreshTime, pObj, pObj->tmrCtrl, &pObj->pTimer); + if (pObj == NULL || pObj->capacity <= 0 || pObj->deleting == 1) { + pTrace("object is destroyed. no refresh retry"); + return; + } + + pObj->refreshing = 1; + +#if defined LINUX + __sync_synchronize(); +#else + MemoryBarrier(); +#endif + + if (pObj->deleting == 1) { + pObj->refreshing = 0; return; } @@ -730,14 +757,15 @@ void taosRefreshDataCache(void *handle, void *tmrId) { int32_t num = pObj->size; - for (int hash = 0; hash < pObj->capacity; ++hash) { -#if defined LINUX - pthread_rwlock_wrlock(&pObj->lock); -#else - pthread_mutex_lock(&pObj->mutex); -#endif + for (int i = 0; i < pObj->capacity; ++i) { + // in deleting process, quit refreshing immediately + if (pObj->deleting == 1) { + pObj->refreshing = 0; + return; + } - pNode = pObj->hashList[hash]; + __cache_wr_lock(pObj); + pNode = pObj->hashList[i]; while (pNode) { numOfCheck++; @@ -751,23 +779,23 @@ void taosRefreshDataCache(void *handle, void *tmrId) { /* all data have been checked, not need to iterate further */ if (numOfCheck == num || pObj->size <= 0) { -#if defined LINUX - pthread_rwlock_unlock(&pObj->lock); -#else - pthread_mutex_unlock(&pObj->mutex); -#endif + __cache_unlock(pObj); break; } -#if defined LINUX - pthread_rwlock_unlock(&pObj->lock); -#else - pthread_mutex_unlock(&pObj->mutex); -#endif + __cache_unlock(pObj); } - taosClearCacheTrash(pObj, false); - taosTmrReset(taosRefreshDataCache, pObj->refreshTime, pObj, pObj->tmrCtrl, &pObj->pTimer); + int16_t isDeleting = pObj->deleting; + pObj->refreshing = 0; + + // the SCacheObj may have been released now. + if (isDeleting == 1) { + return; + } else { + taosClearCacheTrash(pObj, false); + taosTmrReset(taosRefreshDataCache, pObj->refreshTime, pObj, pObj->tmrCtrl, &pObj->pTimer); + } } /** @@ -779,25 +807,22 @@ void taosClearDataCache(void *handle) { SDataNode *pNode, *pNext; SCacheObj *pObj = (SCacheObj *)handle; - for (int hash = 0; hash < pObj->capacity; ++hash) { -#if defined LINUX - pthread_rwlock_wrlock(&pObj->lock); -#else - pthread_mutex_lock(&pObj->mutex); -#endif + int32_t capacity = pObj->capacity; + + for (int i = 0; i < capacity; ++i) { + __cache_wr_lock(pObj); - pNode = pObj->hashList[hash]; + pNode = pObj->hashList[i]; while (pNode) { pNext = pNode->next; taosCacheMoveNodeToTrash(pObj, pNode); pNode = pNext; } -#if defined LINUX - pthread_rwlock_unlock(&pObj->lock); -#else - pthread_mutex_unlock(&pObj->mutex); -#endif + + pObj->hashList[i] = NULL; + + __cache_unlock(pObj); } taosClearCacheTrash(pObj, false); @@ -823,8 +848,8 @@ void *taosInitDataCache(int capacity, void *tmrCtrl, int64_t refreshTime) { } // the max slots is not defined by user - pObj->capacity = taosNormalHashTableLength(capacity); - assert((pObj->capacity & (pObj->capacity-1)) == 0); + pObj->capacity = taosHashTableLength(capacity); + assert((pObj->capacity & (pObj->capacity - 1)) == 0); pObj->hashFp = taosHashKey; pObj->refreshTime = refreshTime * 1000; @@ -839,11 +864,7 @@ void *taosInitDataCache(int capacity, void *tmrCtrl, int64_t refreshTime) { pObj->tmrCtrl = tmrCtrl; taosTmrReset(taosRefreshDataCache, pObj->refreshTime, pObj, pObj->tmrCtrl, &pObj->pTimer); -#if defined LINUX - if (pthread_rwlock_init(&pObj->lock, NULL) != 0) { -#else - if (pthread_mutex_init(&pObj->mutex, NULL) != 0) { -#endif + if (__cache_lock_init(pObj) != 0) { taosTmrStopA(&pObj->pTimer); free(pObj->hashList); free(pObj); @@ -863,26 +884,35 @@ void *taosInitDataCache(int capacity, void *tmrCtrl, int64_t refreshTime) { void taosCleanUpDataCache(void *handle) { SCacheObj *pObj; SDataNode *pNode, *pNext; - pObj = (SCacheObj *)handle; - if (pObj == NULL || pObj->capacity <= 0) { -#if defined LINUX - pthread_rwlock_destroy(&pObj->lock); -#else - pthread_mutex_destroy(&pObj->mutex); -#endif + + if (pObj == NULL) { + return; + } + + if (pObj->capacity <= 0) { + __cache_lock_destroy(pObj); + free(pObj); return; } taosTmrStopA(&pObj->pTimer); + pObj->deleting = 1; + #if defined LINUX - pthread_rwlock_wrlock(&pObj->lock); + __sync_synchronize(); #else - pthread_mutex_lock(&pObj->mutex); + MemoryBarrier(); #endif + while (pObj->refreshing == 1) { + taosMsleep(0); + } + + __cache_wr_lock(pObj); + if (pObj->hashList && pObj->size > 0) { for (int i = 0; i < pObj->capacity; ++i) { pNode = pObj->hashList[i]; @@ -893,22 +923,14 @@ void taosCleanUpDataCache(void *handle) { } } - free(pObj->hashList); + tfree(pObj->hashList); } -#if defined LINUX - pthread_rwlock_unlock(&pObj->lock); -#else - pthread_mutex_unlock(&pObj->mutex); -#endif + __cache_unlock(pObj); taosClearCacheTrash(pObj, true); + __cache_lock_destroy(pObj); -#if defined LINUX - pthread_rwlock_destroy(&pObj->lock); -#else - pthread_mutex_destroy(&pObj->mutex); -#endif memset(pObj, 0, sizeof(SCacheObj)); free(pObj);