diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index 5508c5d2cdb6e8d73466969166d45780e8fc49b9..eecf5100c50bfc02df137f780470fe97c2df2b1a 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -30,6 +30,8 @@ #include "tutil.h" #define HASH_VALUE_IN_TRASH (-1) +#define HASH_DEFAULT_LOAD_FACTOR (0.75) +#define HASH_INDEX(v, c) ((v) & ((c)-1)) /** * todo: refactor to extract the hash table out of cache structure @@ -40,10 +42,12 @@ typedef struct SCacheStatis { int64_t totalAccess; int64_t refreshCount; int32_t numOfCollision; + int32_t numOfResize; + int64_t resizeTime; } SCacheStatis; typedef struct _cache_node_t { - char * key; /* null-terminated string */ + char * key; // null-terminated string struct _cache_node_t *prev; struct _cache_node_t *next; uint64_t addTime; // the time when this element is added or updated into cache @@ -55,20 +59,19 @@ typedef struct _cache_node_t { * if this value is larger than 0, this value will never be released */ uint32_t refCount; - int32_t hashVal; /* the hash value of key, if hashVal == HASH_VALUE_IN_TRASH, this node is moved to trash*/ - uint32_t nodeSize; /* allocated size for current SDataNode */ + uint32_t hashVal; // the hash value of key, if hashVal == HASH_VALUE_IN_TRASH, this node is moved to trash + uint32_t nodeSize; // allocated size for current SDataNode char data[]; } SDataNode; -typedef int (*_hashFunc)(int, char *, uint32_t); +typedef uint32_t (*_hashFunc)(const char *, uint32_t); typedef struct { SDataNode **hashList; - int maxSessions; - int total; - - int64_t totalSize; /* total allocated buffer in this hash table, SCacheObj is not included. */ - int64_t refreshTime; + int capacity; + int size; + int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included. + int64_t refreshTime; /* * to accommodate the old datanode which has the same key value of new one in hashList @@ -78,19 +81,13 @@ typedef struct { * * when the node in pTrash does not be referenced, it will be release at the expired time */ - SDataNode *pTrash; - int numOfElemsInTrash; // number of element in trash - - void *tmrCtrl; - void *pTimer; + SDataNode * pTrash; + int numOfElemsInTrash; // number of element in trash + void * tmrCtrl; + void * pTimer; + SCacheStatis statistics; + _hashFunc hashFp; - SCacheStatis statistics; - _hashFunc hashFp; - - /* - * pthread_rwlock_t will block ops on the windows platform, when refresh is called. - * so use pthread_mutex_t as an alternative - */ #if defined LINUX pthread_rwlock_t lock; #else @@ -125,8 +122,7 @@ static SDataNode *taosCreateHashNode(const char *key, uint32_t keyLen, const cha } memcpy(pNewNode->data, pData, dataSize); - - pNewNode->addTime = (uint64_t) taosGetTimestampMs(); + pNewNode->addTime = (uint64_t)taosGetTimestampMs(); pNewNode->time = pNewNode->addTime + lifespan; pNewNode->key = pNewNode->data + dataSize; @@ -140,20 +136,12 @@ static SDataNode *taosCreateHashNode(const char *key, uint32_t keyLen, const cha /** * hash key function - * @param pObj cache object + * * @param key key string * @param len length of key * @return hash value */ -static FORCE_INLINE int taosHashKey(int maxSessions, char *key, uint32_t len) { - uint32_t hash = MurmurHash3_32(key, len); - - // avoid the costly remainder operation - assert((maxSessions & (maxSessions - 1)) == 0); - hash = hash & (maxSessions - 1); - - return hash; -} +static FORCE_INLINE uint32_t taosHashKey(const char *key, uint32_t len) { return MurmurHash3_32(key, len); } /** * add object node into trash, and this object is closed for referencing if it is add to trash @@ -162,8 +150,7 @@ static FORCE_INLINE int taosHashKey(int maxSessions, char *key, uint32_t len) { * @param pNode Cache slot object */ static void taosAddToTrash(SCacheObj *pObj, SDataNode *pNode) { - if (pNode->hashVal == HASH_VALUE_IN_TRASH) { - /* node is already in trash */ + if (pNode->hashVal == HASH_VALUE_IN_TRASH) { /* node is already in trash */ return; } @@ -209,7 +196,7 @@ static void taosRemoveFromTrash(SCacheObj *pObj, SDataNode *pNode) { * @param force force model, if true, remove data in trash without check refcount. * may cause corruption. So, forece model only applys before cache is closed */ -static void taosClearCacheTrash(SCacheObj *pObj, _Bool force) { +static void taosClearCacheTrash(SCacheObj *pObj, bool force) { #if defined LINUX pthread_rwlock_wrlock(&pObj->lock); #else @@ -225,7 +212,7 @@ static void taosClearCacheTrash(SCacheObj *pObj, _Bool force) { pthread_rwlock_unlock(&pObj->lock); #else pthread_mutex_unlock(&pObj->mutex); -#endif +#endif return; } @@ -264,18 +251,17 @@ static void taosClearCacheTrash(SCacheObj *pObj, _Bool force) { * @param pObj cache object * @param pNode Cache slot object */ -static void taosAddToHashTable(SCacheObj *pObj, SDataNode *pNode) { - assert(pNode->hashVal >= 0); +static void taosAddNodeToHashTable(SCacheObj *pObj, SDataNode *pNode) { + int32_t slotIndex = HASH_INDEX(pNode->hashVal, pObj->capacity); + pNode->next = pObj->hashList[slotIndex]; - pNode->next = pObj->hashList[pNode->hashVal]; - - if (pObj->hashList[pNode->hashVal] != 0) { - (pObj->hashList[pNode->hashVal])->prev = pNode; + if (pObj->hashList[slotIndex] != NULL) { + (pObj->hashList[slotIndex])->prev = pNode; pObj->statistics.numOfCollision++; } - pObj->hashList[pNode->hashVal] = pNode; + pObj->hashList[slotIndex] = pNode; - pObj->total++; + pObj->size++; pObj->totalSize += pNode->nodeSize; pTrace("key:%s %p add to hash table", pNode->key, pNode); @@ -290,18 +276,17 @@ static void taosRemoveNodeInHashTable(SCacheObj *pObj, SDataNode *pNode) { if (pNode->hashVal == HASH_VALUE_IN_TRASH) return; SDataNode *pNext = pNode->next; - if (pNode->prev) { + if (pNode->prev != NULL) { pNode->prev->next = pNext; - } else { - /* the node is in hashlist, remove it */ - pObj->hashList[pNode->hashVal] = pNext; + } else { /* the node is in hashlist, remove it */ + pObj->hashList[HASH_INDEX(pNode->hashVal, pObj->capacity)] = pNext; } - if (pNext) { + if (pNext != NULL) { pNext->prev = pNode->prev; } - pObj->total--; + pObj->size--; pObj->totalSize -= pNode->nodeSize; pNode->next = NULL; @@ -321,7 +306,7 @@ static void taosUpdateInHashTable(SCacheObj *pObj, SDataNode *pNode) { if (pNode->prev) { pNode->prev->next = pNode; } else { - pObj->hashList[pNode->hashVal] = pNode; + pObj->hashList[HASH_INDEX(pNode->hashVal, pObj->capacity)] = pNode; } if (pNode->next) { @@ -339,9 +324,11 @@ static void taosUpdateInHashTable(SCacheObj *pObj, SDataNode *pNode) { * @return */ static SDataNode *taosGetNodeFromHashTable(SCacheObj *pObj, char *key, uint32_t keyLen) { - int hash = (*pObj->hashFp)(pObj->maxSessions, key, keyLen); + uint32_t hash = (*pObj->hashFp)(key, keyLen); + + int32_t slot = HASH_INDEX(hash, pObj->capacity); + SDataNode *pNode = pObj->hashList[slot]; - SDataNode *pNode = pObj->hashList[hash]; while (pNode) { if (strcmp(pNode->key, key) == 0) break; @@ -349,12 +336,84 @@ static SDataNode *taosGetNodeFromHashTable(SCacheObj *pObj, char *key, uint32_t } if (pNode) { - assert(pNode->hashVal == hash); + assert(HASH_INDEX(pNode->hashVal, pObj->capacity) == slot); } return pNode; } +/** + * resize the hash list if the threshold is reached + * + * @param pObj + */ +static void taosHashTableResize(SCacheObj *pObj) { + if (pObj->size < pObj->capacity * HASH_DEFAULT_LOAD_FACTOR) { + return; + } + + // double the original capacity + pObj->statistics.numOfResize++; + SDataNode *pNode = NULL; + SDataNode *pNext = NULL; + + int32_t newSize = pObj->capacity << 1; + + int64_t st = taosGetTimestampUs(); + SDataNode **pList = realloc(pObj->hashList, sizeof(SDataNode *) * newSize); + if (pList == NULL) { + pTrace("cache resize failed due to out of memory, capacity remain:%d", pObj->capacity); + return; + } + + pObj->hashList = pList; + pObj->capacity = newSize; + + for (int32_t i = 0; i < pObj->capacity; ++i) { + pNode = pObj->hashList[i]; + + while (pNode) { + int32_t j = HASH_INDEX(pNode->hashVal, pObj->capacity); + if (j == i) { // this key resides in the same slot, no need to relocate it + pNode = pNode->next; + } else { + pNext = pNode->next; + + // remove from current slot + if (pNode->prev != NULL) { + pNode->prev->next = pNode->next; + } else { + pObj->hashList[i] = pNode->next; + } + + if (pNode->next != NULL) { + (pNode->next)->prev = pNode->prev; + } + + // added into new slot + pNode->next = NULL; + pNode->prev = NULL; + + pNode->next = pObj->hashList[j]; + + if (pObj->hashList[j] != NULL) { + (pObj->hashList[j])->prev = pNode; + } + pObj->hashList[j] = pNode; + + // continue + pNode = pNext; + } + } + } + + int64_t et = taosGetTimestampUs(); + pObj->statistics.resizeTime += (et - st); + + pTrace("cache resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", pObj->capacity, + ((double)pObj->size) / pObj->capacity, (et - st) / 1000.0); +} + /** * release node * @param pObj cache object @@ -367,7 +426,7 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pObj, SDataNode *pNode) return; } - pTrace("key:%s is removed from cache,total:%d,size:%ldbytes", pNode->key, pObj->total, pObj->totalSize); + pTrace("key:%s is removed from cache,total:%d,size:%ldbytes", pNode->key, pObj->size, pObj->totalSize); pNode->signature = 0; free(pNode); } @@ -396,7 +455,7 @@ static SDataNode *taosUpdateCacheImpl(SCacheObj *pObj, SDataNode *pNode, char *k uint32_t dataSize, uint64_t keepTime) { SDataNode *pNewNode = NULL; - /* only a node is not referenced by any other object, in-place update it */ + // only a node is not referenced by any other object, in-place update it if (pNode->refCount == 0) { size_t newSize = sizeof(SDataNode) + dataSize + keyLen; @@ -411,7 +470,13 @@ static SDataNode *taosUpdateCacheImpl(SCacheObj *pObj, SDataNode *pNode, char *k pNewNode->key = pNewNode->data + dataSize; strcpy(pNewNode->key, key); + // update the timestamp information for updated key/value + pNewNode->addTime = taosGetTimestampMs(); + pNewNode->time = pNewNode->addTime + keepTime; + __sync_add_and_fetch_32(&pNewNode->refCount, 1); + + // the address of this node may be changed, so the prev and next element should update the corresponding pointer taosUpdateInHashTable(pObj, pNewNode); } else { int32_t hashVal = pNode->hashVal; @@ -424,11 +489,11 @@ static SDataNode *taosUpdateCacheImpl(SCacheObj *pObj, SDataNode *pNode, char *k __sync_add_and_fetch_32(&pNewNode->refCount, 1); - assert(hashVal == (*pObj->hashFp)(pObj->maxSessions, key, keyLen - 1)); + assert(hashVal == (*pObj->hashFp)(key, keyLen - 1)); pNewNode->hashVal = hashVal; - /* add new one to hashtable */ - taosAddToHashTable(pObj, pNewNode); + // add new element to hashtable + taosAddNodeToHashTable(pObj, pNewNode); } return pNewNode; @@ -452,8 +517,8 @@ static FORCE_INLINE SDataNode *taosAddToCacheImpl(SCacheObj *pObj, char *key, ui } __sync_add_and_fetch_32(&pNode->refCount, 1); - pNode->hashVal = (*pObj->hashFp)(pObj->maxSessions, key, keyLen - 1); - taosAddToHashTable(pObj, pNode); + pNode->hashVal = (*pObj->hashFp)(key, keyLen - 1); + taosAddNodeToHashTable(pObj, pNode); return pNode; } @@ -473,7 +538,7 @@ void *taosAddDataIntoCache(void *handle, char *key, char *pData, int dataSize, i SCacheObj *pObj; pObj = (SCacheObj *)handle; - if (pObj == NULL || pObj->maxSessions == 0) return NULL; + if (pObj == NULL || pObj->capacity == 0) return NULL; uint32_t keyLen = (uint32_t)strlen(key) + 1; @@ -486,10 +551,15 @@ void *taosAddDataIntoCache(void *handle, char *key, char *pData, int dataSize, i SDataNode *pOldNode = taosGetNodeFromHashTable(pObj, key, keyLen - 1); if (pOldNode == NULL) { // do add to cache + // check if the threshold is reached + taosHashTableResize(pObj); + pNode = taosAddToCacheImpl(pObj, key, keyLen, pData, dataSize, keepTime * 1000L); - pTrace("key:%s %p added into cache, slot:%d, addTime:%lld, expireTime:%lld, cache total:%d, " - "size:%lldbytes, collision:%d", pNode->key, pNode, pNode->hashVal, pNode->addTime, pNode->time, pObj->total, - pObj->totalSize, pObj->statistics.numOfCollision); + pTrace( + "key:%s %p added into cache, slot:%d, addTime:%lld, expireTime:%lld, cache total:%d, " + "size:%lldbytes, collision:%d", + pNode->key, pNode, HASH_INDEX(pNode->hashVal, pObj->capacity), pNode->addTime, pNode->time, pObj->size, + pObj->totalSize, pObj->statistics.numOfCollision); } else { // old data exists, update the node pNode = taosUpdateCacheImpl(pObj, pOldNode, key, keyLen, pData, dataSize, keepTime * 1000L); pTrace("key:%s %p exist in cache, updated", key, pNode); @@ -504,7 +574,7 @@ void *taosAddDataIntoCache(void *handle, char *key, char *pData, int dataSize, i return (pNode != NULL) ? pNode->data : NULL; } -static FORCE_INLINE void taosDecRef(SDataNode* pNode) { +static FORCE_INLINE void taosDecRef(SDataNode *pNode) { if (pNode == NULL) { return; } @@ -527,9 +597,9 @@ static FORCE_INLINE void taosDecRef(SDataNode* pNode) { * @param handle * @param data */ -void taosRemoveDataFromCache(void *handle, void **data, bool remove) { +void taosRemoveDataFromCache(void *handle, void **data, bool _remove) { SCacheObj *pObj = (SCacheObj *)handle; - if (pObj == NULL || pObj->maxSessions == 0 || (*data) == NULL || (pObj->total + pObj->numOfElemsInTrash == 0)) return; + if (pObj == NULL || pObj->capacity == 0 || (*data) == NULL || (pObj->size + pObj->numOfElemsInTrash == 0)) return; size_t offset = offsetof(SDataNode, data); SDataNode *pNode = (SDataNode *)((char *)(*data) - offset); @@ -540,7 +610,7 @@ void taosRemoveDataFromCache(void *handle, void **data, bool remove) { } *data = NULL; - + if (remove) { #if defined LINUX pthread_rwlock_wrlock(&pObj->lock); @@ -552,11 +622,7 @@ void taosRemoveDataFromCache(void *handle, void **data, bool remove) { taosDecRef(pNode); taosCacheMoveNodeToTrash(pObj, pNode); -#if defined LINUX pthread_rwlock_unlock(&pObj->lock); -#else - pthread_mutex_unlock(&pObj->mutex); -#endif } else { taosDecRef(pNode); } @@ -570,7 +636,7 @@ void taosRemoveDataFromCache(void *handle, void **data, bool remove) { */ void *taosGetDataFromCache(void *handle, char *key) { SCacheObj *pObj = (SCacheObj *)handle; - if (pObj == NULL || pObj->maxSessions == 0) return NULL; + if (pObj == NULL || pObj->capacity == 0) return NULL; uint32_t keyLen = (uint32_t)strlen(key); @@ -592,14 +658,14 @@ void *taosGetDataFromCache(void *handle, char *key) { #endif if (ptNode != NULL) { - __sync_add_and_fetch_64(&pObj->statistics.hitCount, 1); - + __sync_add_and_fetch_32(&pObj->statistics.hitCount, 1); pTrace("key:%s is retrieved from cache,refcnt:%d", key, ptNode->refCount); } else { - __sync_add_and_fetch_64(&pObj->statistics.missCount, 1); + __sync_add_and_fetch_32(&pObj->statistics.missCount, 1); pTrace("key:%s not in cache,retrieved failed", key); } - __sync_add_and_fetch_64(&pObj->statistics.totalAccess, 1); + + __sync_add_and_fetch_32(&pObj->statistics.totalAccess, 1); return (ptNode != NULL) ? ptNode->data : NULL; } @@ -613,7 +679,7 @@ void *taosGetDataFromCache(void *handle, char *key) { */ void *taosUpdateDataFromCache(void *handle, char *key, char *pData, int size, int duration) { SCacheObj *pObj = (SCacheObj *)handle; - if (pObj == NULL || pObj->maxSessions == 0) return NULL; + if (pObj == NULL || pObj->capacity == 0) return NULL; SDataNode *pNew = NULL; @@ -626,9 +692,10 @@ void *taosUpdateDataFromCache(void *handle, char *key, char *pData, int size, in #endif SDataNode *pNode = taosGetNodeFromHashTable(handle, key, keyLen - 1); + if (pNode == NULL) { // object has been released, do add operation pNew = taosAddToCacheImpl(pObj, key, keyLen, pData, size, duration * 1000L); - pWarn("key:%s does not exist, update failed,do add to cache.total:%d,size:%ldbytes", key, pObj->total, + pWarn("key:%s does not exist, update failed,do add to cache.total:%d,size:%ldbytes", key, pObj->size, pObj->totalSize); } else { pNew = taosUpdateCacheImpl(pObj, pNode, key, keyLen, pData, size, duration * 1000L); @@ -652,7 +719,7 @@ void taosRefreshDataCache(void *handle, void *tmrId) { SDataNode *pNode, *pNext; SCacheObj *pObj = (SCacheObj *)handle; - if (pObj == NULL || (pObj->total == 0 && pObj->numOfElemsInTrash == 0)) { + if (pObj == NULL || (pObj->size == 0 && pObj->numOfElemsInTrash == 0)) { taosTmrReset(taosRefreshDataCache, pObj->refreshTime, pObj, pObj->tmrCtrl, &pObj->pTimer); return; } @@ -661,8 +728,9 @@ void taosRefreshDataCache(void *handle, void *tmrId) { uint32_t numOfCheck = 0; pObj->statistics.refreshCount++; - int32_t num = pObj->total; - for (int hash = 0; hash < pObj->maxSessions; ++hash) { + int32_t num = pObj->size; + + for (int hash = 0; hash < pObj->capacity; ++hash) { #if defined LINUX pthread_rwlock_wrlock(&pObj->lock); #else @@ -682,7 +750,7 @@ void taosRefreshDataCache(void *handle, void *tmrId) { } /* all data have been checked, not need to iterate further */ - if (numOfCheck == num || pObj->total <= 0) { + if (numOfCheck == num || pObj->size <= 0) { #if defined LINUX pthread_rwlock_unlock(&pObj->lock); #else @@ -711,7 +779,7 @@ void taosClearDataCache(void *handle) { SDataNode *pNode, *pNext; SCacheObj *pObj = (SCacheObj *)handle; - for (int hash = 0; hash < pObj->maxSessions; ++hash) { + for (int hash = 0; hash < pObj->capacity; ++hash) { #if defined LINUX pthread_rwlock_wrlock(&pObj->lock); #else @@ -737,14 +805,14 @@ void taosClearDataCache(void *handle) { /** * - * @param maxSessions maximum slots available for hash elements + * @param capacity maximum slots available for hash elements * @param tmrCtrl timer ctrl * @param refreshTime refresh operation interval time, the maximum survival time when one element is expired and * not referenced by other objects * @return */ -void *taosInitDataCache(int maxSessions, void *tmrCtrl, int64_t refreshTime) { - if (tmrCtrl == NULL || refreshTime <= 0 || maxSessions <= 0) { +void *taosInitDataCache(int capacity, void *tmrCtrl, int64_t refreshTime) { + if (tmrCtrl == NULL || refreshTime <= 0 || capacity <= 0) { return NULL; } @@ -754,12 +822,14 @@ void *taosInitDataCache(int maxSessions, void *tmrCtrl, int64_t refreshTime) { return NULL; } - pObj->maxSessions = taosNormalHashTableLength(maxSessions); + // the max slots is not defined by user + pObj->capacity = taosNormalHashTableLength(capacity); + assert((pObj->capacity & (pObj->capacity-1)) == 0); pObj->hashFp = taosHashKey; pObj->refreshTime = refreshTime * 1000; - pObj->hashList = (SDataNode **)calloc(1, sizeof(SDataNode *) * pObj->maxSessions); + pObj->hashList = (SDataNode **)calloc(1, sizeof(SDataNode *) * pObj->capacity); if (pObj->hashList == NULL) { free(pObj); pError("failed to allocate memory, reason:%s", strerror(errno)); @@ -795,7 +865,7 @@ void taosCleanUpDataCache(void *handle) { SDataNode *pNode, *pNext; pObj = (SCacheObj *)handle; - if (pObj == NULL || pObj->maxSessions <= 0) { + if (pObj == NULL || pObj->capacity <= 0) { #if defined LINUX pthread_rwlock_destroy(&pObj->lock); #else @@ -813,8 +883,8 @@ void taosCleanUpDataCache(void *handle) { pthread_mutex_lock(&pObj->mutex); #endif - if (pObj->hashList && pObj->total > 0) { - for (int i = 0; i < pObj->maxSessions; ++i) { + if (pObj->hashList && pObj->size > 0) { + for (int i = 0; i < pObj->capacity; ++i) { pNode = pObj->hashList[i]; while (pNode) { pNext = pNode->next;