提交 da3688f8 编写于 作者: H hjxilinx

fix issue #584

上级 5227c732
......@@ -30,6 +30,8 @@
#include "tutil.h"
#define HASH_VALUE_IN_TRASH (-1)
#define HASH_DEFAULT_LOAD_FACTOR (0.75)
#define HASH_INDEX(v, c) ((v) & ((c)-1))
/**
* todo: refactor to extract the hash table out of cache structure
......@@ -40,10 +42,12 @@ typedef struct SCacheStatis {
int64_t totalAccess;
int64_t refreshCount;
int32_t numOfCollision;
int32_t numOfResize;
int64_t resizeTime;
} SCacheStatis;
typedef struct _cache_node_t {
char * key; /* null-terminated string */
char * key; // null-terminated string
struct _cache_node_t *prev;
struct _cache_node_t *next;
uint64_t addTime; // the time when this element is added or updated into cache
......@@ -55,19 +59,18 @@ typedef struct _cache_node_t {
* if this value is larger than 0, this value will never be released
*/
uint32_t refCount;
int32_t hashVal; /* the hash value of key, if hashVal == HASH_VALUE_IN_TRASH, this node is moved to trash*/
uint32_t nodeSize; /* allocated size for current SDataNode */
uint32_t hashVal; // the hash value of key, if hashVal == HASH_VALUE_IN_TRASH, this node is moved to trash
uint32_t nodeSize; // allocated size for current SDataNode
char data[];
} SDataNode;
typedef int (*_hashFunc)(int, char *, uint32_t);
typedef uint32_t (*_hashFunc)(const char *, uint32_t);
typedef struct {
SDataNode **hashList;
int maxSessions;
int total;
int64_t totalSize; /* total allocated buffer in this hash table, SCacheObj is not included. */
int capacity;
int size;
int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included.
int64_t refreshTime;
/*
......@@ -78,19 +81,13 @@ typedef struct {
*
* when the node in pTrash does not be referenced, it will be release at the expired time
*/
SDataNode *pTrash;
SDataNode * pTrash;
int numOfElemsInTrash; // number of element in trash
void *tmrCtrl;
void *pTimer;
void * tmrCtrl;
void * pTimer;
SCacheStatis statistics;
_hashFunc hashFp;
/*
* pthread_rwlock_t will block ops on the windows platform, when refresh is called.
* so use pthread_mutex_t as an alternative
*/
#if defined LINUX
pthread_rwlock_t lock;
#else
......@@ -125,8 +122,7 @@ static SDataNode *taosCreateHashNode(const char *key, uint32_t keyLen, const cha
}
memcpy(pNewNode->data, pData, dataSize);
pNewNode->addTime = (uint64_t) taosGetTimestampMs();
pNewNode->addTime = (uint64_t)taosGetTimestampMs();
pNewNode->time = pNewNode->addTime + lifespan;
pNewNode->key = pNewNode->data + dataSize;
......@@ -140,20 +136,12 @@ static SDataNode *taosCreateHashNode(const char *key, uint32_t keyLen, const cha
/**
* hash key function
* @param pObj cache object
*
* @param key key string
* @param len length of key
* @return hash value
*/
static FORCE_INLINE int taosHashKey(int maxSessions, char *key, uint32_t len) {
uint32_t hash = MurmurHash3_32(key, len);
// avoid the costly remainder operation
assert((maxSessions & (maxSessions - 1)) == 0);
hash = hash & (maxSessions - 1);
return hash;
}
static FORCE_INLINE uint32_t taosHashKey(const char *key, uint32_t len) { return MurmurHash3_32(key, len); }
/**
* add object node into trash, and this object is closed for referencing if it is add to trash
......@@ -162,8 +150,7 @@ static FORCE_INLINE int taosHashKey(int maxSessions, char *key, uint32_t len) {
* @param pNode Cache slot object
*/
static void taosAddToTrash(SCacheObj *pObj, SDataNode *pNode) {
if (pNode->hashVal == HASH_VALUE_IN_TRASH) {
/* node is already in trash */
if (pNode->hashVal == HASH_VALUE_IN_TRASH) { /* node is already in trash */
return;
}
......@@ -209,7 +196,7 @@ static void taosRemoveFromTrash(SCacheObj *pObj, SDataNode *pNode) {
* @param force force model, if true, remove data in trash without check refcount.
* may cause corruption. So, forece model only applys before cache is closed
*/
static void taosClearCacheTrash(SCacheObj *pObj, _Bool force) {
static void taosClearCacheTrash(SCacheObj *pObj, bool force) {
#if defined LINUX
pthread_rwlock_wrlock(&pObj->lock);
#else
......@@ -264,18 +251,17 @@ static void taosClearCacheTrash(SCacheObj *pObj, _Bool force) {
* @param pObj cache object
* @param pNode Cache slot object
*/
static void taosAddToHashTable(SCacheObj *pObj, SDataNode *pNode) {
assert(pNode->hashVal >= 0);
pNode->next = pObj->hashList[pNode->hashVal];
static void taosAddNodeToHashTable(SCacheObj *pObj, SDataNode *pNode) {
int32_t slotIndex = HASH_INDEX(pNode->hashVal, pObj->capacity);
pNode->next = pObj->hashList[slotIndex];
if (pObj->hashList[pNode->hashVal] != 0) {
(pObj->hashList[pNode->hashVal])->prev = pNode;
if (pObj->hashList[slotIndex] != NULL) {
(pObj->hashList[slotIndex])->prev = pNode;
pObj->statistics.numOfCollision++;
}
pObj->hashList[pNode->hashVal] = pNode;
pObj->hashList[slotIndex] = pNode;
pObj->total++;
pObj->size++;
pObj->totalSize += pNode->nodeSize;
pTrace("key:%s %p add to hash table", pNode->key, pNode);
......@@ -290,18 +276,17 @@ static void taosRemoveNodeInHashTable(SCacheObj *pObj, SDataNode *pNode) {
if (pNode->hashVal == HASH_VALUE_IN_TRASH) return;
SDataNode *pNext = pNode->next;
if (pNode->prev) {
if (pNode->prev != NULL) {
pNode->prev->next = pNext;
} else {
/* the node is in hashlist, remove it */
pObj->hashList[pNode->hashVal] = pNext;
} else { /* the node is in hashlist, remove it */
pObj->hashList[HASH_INDEX(pNode->hashVal, pObj->capacity)] = pNext;
}
if (pNext) {
if (pNext != NULL) {
pNext->prev = pNode->prev;
}
pObj->total--;
pObj->size--;
pObj->totalSize -= pNode->nodeSize;
pNode->next = NULL;
......@@ -321,7 +306,7 @@ static void taosUpdateInHashTable(SCacheObj *pObj, SDataNode *pNode) {
if (pNode->prev) {
pNode->prev->next = pNode;
} else {
pObj->hashList[pNode->hashVal] = pNode;
pObj->hashList[HASH_INDEX(pNode->hashVal, pObj->capacity)] = pNode;
}
if (pNode->next) {
......@@ -339,9 +324,11 @@ static void taosUpdateInHashTable(SCacheObj *pObj, SDataNode *pNode) {
* @return
*/
static SDataNode *taosGetNodeFromHashTable(SCacheObj *pObj, char *key, uint32_t keyLen) {
int hash = (*pObj->hashFp)(pObj->maxSessions, key, keyLen);
uint32_t hash = (*pObj->hashFp)(key, keyLen);
int32_t slot = HASH_INDEX(hash, pObj->capacity);
SDataNode *pNode = pObj->hashList[slot];
SDataNode *pNode = pObj->hashList[hash];
while (pNode) {
if (strcmp(pNode->key, key) == 0) break;
......@@ -349,12 +336,84 @@ static SDataNode *taosGetNodeFromHashTable(SCacheObj *pObj, char *key, uint32_t
}
if (pNode) {
assert(pNode->hashVal == hash);
assert(HASH_INDEX(pNode->hashVal, pObj->capacity) == slot);
}
return pNode;
}
/**
* resize the hash list if the threshold is reached
*
* @param pObj
*/
static void taosHashTableResize(SCacheObj *pObj) {
if (pObj->size < pObj->capacity * HASH_DEFAULT_LOAD_FACTOR) {
return;
}
// double the original capacity
pObj->statistics.numOfResize++;
SDataNode *pNode = NULL;
SDataNode *pNext = NULL;
int32_t newSize = pObj->capacity << 1;
int64_t st = taosGetTimestampUs();
SDataNode **pList = realloc(pObj->hashList, sizeof(SDataNode *) * newSize);
if (pList == NULL) {
pTrace("cache resize failed due to out of memory, capacity remain:%d", pObj->capacity);
return;
}
pObj->hashList = pList;
pObj->capacity = newSize;
for (int32_t i = 0; i < pObj->capacity; ++i) {
pNode = pObj->hashList[i];
while (pNode) {
int32_t j = HASH_INDEX(pNode->hashVal, pObj->capacity);
if (j == i) { // this key resides in the same slot, no need to relocate it
pNode = pNode->next;
} else {
pNext = pNode->next;
// remove from current slot
if (pNode->prev != NULL) {
pNode->prev->next = pNode->next;
} else {
pObj->hashList[i] = pNode->next;
}
if (pNode->next != NULL) {
(pNode->next)->prev = pNode->prev;
}
// added into new slot
pNode->next = NULL;
pNode->prev = NULL;
pNode->next = pObj->hashList[j];
if (pObj->hashList[j] != NULL) {
(pObj->hashList[j])->prev = pNode;
}
pObj->hashList[j] = pNode;
// continue
pNode = pNext;
}
}
}
int64_t et = taosGetTimestampUs();
pObj->statistics.resizeTime += (et - st);
pTrace("cache resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", pObj->capacity,
((double)pObj->size) / pObj->capacity, (et - st) / 1000.0);
}
/**
* release node
* @param pObj cache object
......@@ -367,7 +426,7 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pObj, SDataNode *pNode)
return;
}
pTrace("key:%s is removed from cache,total:%d,size:%ldbytes", pNode->key, pObj->total, pObj->totalSize);
pTrace("key:%s is removed from cache,total:%d,size:%ldbytes", pNode->key, pObj->size, pObj->totalSize);
pNode->signature = 0;
free(pNode);
}
......@@ -396,7 +455,7 @@ static SDataNode *taosUpdateCacheImpl(SCacheObj *pObj, SDataNode *pNode, char *k
uint32_t dataSize, uint64_t keepTime) {
SDataNode *pNewNode = NULL;
/* only a node is not referenced by any other object, in-place update it */
// only a node is not referenced by any other object, in-place update it
if (pNode->refCount == 0) {
size_t newSize = sizeof(SDataNode) + dataSize + keyLen;
......@@ -411,7 +470,13 @@ static SDataNode *taosUpdateCacheImpl(SCacheObj *pObj, SDataNode *pNode, char *k
pNewNode->key = pNewNode->data + dataSize;
strcpy(pNewNode->key, key);
// update the timestamp information for updated key/value
pNewNode->addTime = taosGetTimestampMs();
pNewNode->time = pNewNode->addTime + keepTime;
__sync_add_and_fetch_32(&pNewNode->refCount, 1);
// the address of this node may be changed, so the prev and next element should update the corresponding pointer
taosUpdateInHashTable(pObj, pNewNode);
} else {
int32_t hashVal = pNode->hashVal;
......@@ -424,11 +489,11 @@ static SDataNode *taosUpdateCacheImpl(SCacheObj *pObj, SDataNode *pNode, char *k
__sync_add_and_fetch_32(&pNewNode->refCount, 1);
assert(hashVal == (*pObj->hashFp)(pObj->maxSessions, key, keyLen - 1));
assert(hashVal == (*pObj->hashFp)(key, keyLen - 1));
pNewNode->hashVal = hashVal;
/* add new one to hashtable */
taosAddToHashTable(pObj, pNewNode);
// add new element to hashtable
taosAddNodeToHashTable(pObj, pNewNode);
}
return pNewNode;
......@@ -452,8 +517,8 @@ static FORCE_INLINE SDataNode *taosAddToCacheImpl(SCacheObj *pObj, char *key, ui
}
__sync_add_and_fetch_32(&pNode->refCount, 1);
pNode->hashVal = (*pObj->hashFp)(pObj->maxSessions, key, keyLen - 1);
taosAddToHashTable(pObj, pNode);
pNode->hashVal = (*pObj->hashFp)(key, keyLen - 1);
taosAddNodeToHashTable(pObj, pNode);
return pNode;
}
......@@ -473,7 +538,7 @@ void *taosAddDataIntoCache(void *handle, char *key, char *pData, int dataSize, i
SCacheObj *pObj;
pObj = (SCacheObj *)handle;
if (pObj == NULL || pObj->maxSessions == 0) return NULL;
if (pObj == NULL || pObj->capacity == 0) return NULL;
uint32_t keyLen = (uint32_t)strlen(key) + 1;
......@@ -486,9 +551,14 @@ void *taosAddDataIntoCache(void *handle, char *key, char *pData, int dataSize, i
SDataNode *pOldNode = taosGetNodeFromHashTable(pObj, key, keyLen - 1);
if (pOldNode == NULL) { // do add to cache
// check if the threshold is reached
taosHashTableResize(pObj);
pNode = taosAddToCacheImpl(pObj, key, keyLen, pData, dataSize, keepTime * 1000L);
pTrace("key:%s %p added into cache, slot:%d, addTime:%lld, expireTime:%lld, cache total:%d, "
"size:%lldbytes, collision:%d", pNode->key, pNode, pNode->hashVal, pNode->addTime, pNode->time, pObj->total,
pTrace(
"key:%s %p added into cache, slot:%d, addTime:%lld, expireTime:%lld, cache total:%d, "
"size:%lldbytes, collision:%d",
pNode->key, pNode, HASH_INDEX(pNode->hashVal, pObj->capacity), pNode->addTime, pNode->time, pObj->size,
pObj->totalSize, pObj->statistics.numOfCollision);
} else { // old data exists, update the node
pNode = taosUpdateCacheImpl(pObj, pOldNode, key, keyLen, pData, dataSize, keepTime * 1000L);
......@@ -504,7 +574,7 @@ void *taosAddDataIntoCache(void *handle, char *key, char *pData, int dataSize, i
return (pNode != NULL) ? pNode->data : NULL;
}
static FORCE_INLINE void taosDecRef(SDataNode* pNode) {
static FORCE_INLINE void taosDecRef(SDataNode *pNode) {
if (pNode == NULL) {
return;
}
......@@ -527,9 +597,9 @@ static FORCE_INLINE void taosDecRef(SDataNode* pNode) {
* @param handle
* @param data
*/
void taosRemoveDataFromCache(void *handle, void **data, bool remove) {
void taosRemoveDataFromCache(void *handle, void **data, bool _remove) {
SCacheObj *pObj = (SCacheObj *)handle;
if (pObj == NULL || pObj->maxSessions == 0 || (*data) == NULL || (pObj->total + pObj->numOfElemsInTrash == 0)) return;
if (pObj == NULL || pObj->capacity == 0 || (*data) == NULL || (pObj->size + pObj->numOfElemsInTrash == 0)) return;
size_t offset = offsetof(SDataNode, data);
SDataNode *pNode = (SDataNode *)((char *)(*data) - offset);
......@@ -552,11 +622,7 @@ void taosRemoveDataFromCache(void *handle, void **data, bool remove) {
taosDecRef(pNode);
taosCacheMoveNodeToTrash(pObj, pNode);
#if defined LINUX
pthread_rwlock_unlock(&pObj->lock);
#else
pthread_mutex_unlock(&pObj->mutex);
#endif
} else {
taosDecRef(pNode);
}
......@@ -570,7 +636,7 @@ void taosRemoveDataFromCache(void *handle, void **data, bool remove) {
*/
void *taosGetDataFromCache(void *handle, char *key) {
SCacheObj *pObj = (SCacheObj *)handle;
if (pObj == NULL || pObj->maxSessions == 0) return NULL;
if (pObj == NULL || pObj->capacity == 0) return NULL;
uint32_t keyLen = (uint32_t)strlen(key);
......@@ -592,14 +658,14 @@ void *taosGetDataFromCache(void *handle, char *key) {
#endif
if (ptNode != NULL) {
__sync_add_and_fetch_64(&pObj->statistics.hitCount, 1);
__sync_add_and_fetch_32(&pObj->statistics.hitCount, 1);
pTrace("key:%s is retrieved from cache,refcnt:%d", key, ptNode->refCount);
} else {
__sync_add_and_fetch_64(&pObj->statistics.missCount, 1);
__sync_add_and_fetch_32(&pObj->statistics.missCount, 1);
pTrace("key:%s not in cache,retrieved failed", key);
}
__sync_add_and_fetch_64(&pObj->statistics.totalAccess, 1);
__sync_add_and_fetch_32(&pObj->statistics.totalAccess, 1);
return (ptNode != NULL) ? ptNode->data : NULL;
}
......@@ -613,7 +679,7 @@ void *taosGetDataFromCache(void *handle, char *key) {
*/
void *taosUpdateDataFromCache(void *handle, char *key, char *pData, int size, int duration) {
SCacheObj *pObj = (SCacheObj *)handle;
if (pObj == NULL || pObj->maxSessions == 0) return NULL;
if (pObj == NULL || pObj->capacity == 0) return NULL;
SDataNode *pNew = NULL;
......@@ -626,9 +692,10 @@ void *taosUpdateDataFromCache(void *handle, char *key, char *pData, int size, in
#endif
SDataNode *pNode = taosGetNodeFromHashTable(handle, key, keyLen - 1);
if (pNode == NULL) { // object has been released, do add operation
pNew = taosAddToCacheImpl(pObj, key, keyLen, pData, size, duration * 1000L);
pWarn("key:%s does not exist, update failed,do add to cache.total:%d,size:%ldbytes", key, pObj->total,
pWarn("key:%s does not exist, update failed,do add to cache.total:%d,size:%ldbytes", key, pObj->size,
pObj->totalSize);
} else {
pNew = taosUpdateCacheImpl(pObj, pNode, key, keyLen, pData, size, duration * 1000L);
......@@ -652,7 +719,7 @@ void taosRefreshDataCache(void *handle, void *tmrId) {
SDataNode *pNode, *pNext;
SCacheObj *pObj = (SCacheObj *)handle;
if (pObj == NULL || (pObj->total == 0 && pObj->numOfElemsInTrash == 0)) {
if (pObj == NULL || (pObj->size == 0 && pObj->numOfElemsInTrash == 0)) {
taosTmrReset(taosRefreshDataCache, pObj->refreshTime, pObj, pObj->tmrCtrl, &pObj->pTimer);
return;
}
......@@ -661,8 +728,9 @@ void taosRefreshDataCache(void *handle, void *tmrId) {
uint32_t numOfCheck = 0;
pObj->statistics.refreshCount++;
int32_t num = pObj->total;
for (int hash = 0; hash < pObj->maxSessions; ++hash) {
int32_t num = pObj->size;
for (int hash = 0; hash < pObj->capacity; ++hash) {
#if defined LINUX
pthread_rwlock_wrlock(&pObj->lock);
#else
......@@ -682,7 +750,7 @@ void taosRefreshDataCache(void *handle, void *tmrId) {
}
/* all data have been checked, not need to iterate further */
if (numOfCheck == num || pObj->total <= 0) {
if (numOfCheck == num || pObj->size <= 0) {
#if defined LINUX
pthread_rwlock_unlock(&pObj->lock);
#else
......@@ -711,7 +779,7 @@ void taosClearDataCache(void *handle) {
SDataNode *pNode, *pNext;
SCacheObj *pObj = (SCacheObj *)handle;
for (int hash = 0; hash < pObj->maxSessions; ++hash) {
for (int hash = 0; hash < pObj->capacity; ++hash) {
#if defined LINUX
pthread_rwlock_wrlock(&pObj->lock);
#else
......@@ -737,14 +805,14 @@ void taosClearDataCache(void *handle) {
/**
*
* @param maxSessions maximum slots available for hash elements
* @param capacity maximum slots available for hash elements
* @param tmrCtrl timer ctrl
* @param refreshTime refresh operation interval time, the maximum survival time when one element is expired and
* not referenced by other objects
* @return
*/
void *taosInitDataCache(int maxSessions, void *tmrCtrl, int64_t refreshTime) {
if (tmrCtrl == NULL || refreshTime <= 0 || maxSessions <= 0) {
void *taosInitDataCache(int capacity, void *tmrCtrl, int64_t refreshTime) {
if (tmrCtrl == NULL || refreshTime <= 0 || capacity <= 0) {
return NULL;
}
......@@ -754,12 +822,14 @@ void *taosInitDataCache(int maxSessions, void *tmrCtrl, int64_t refreshTime) {
return NULL;
}
pObj->maxSessions = taosNormalHashTableLength(maxSessions);
// the max slots is not defined by user
pObj->capacity = taosNormalHashTableLength(capacity);
assert((pObj->capacity & (pObj->capacity-1)) == 0);
pObj->hashFp = taosHashKey;
pObj->refreshTime = refreshTime * 1000;
pObj->hashList = (SDataNode **)calloc(1, sizeof(SDataNode *) * pObj->maxSessions);
pObj->hashList = (SDataNode **)calloc(1, sizeof(SDataNode *) * pObj->capacity);
if (pObj->hashList == NULL) {
free(pObj);
pError("failed to allocate memory, reason:%s", strerror(errno));
......@@ -795,7 +865,7 @@ void taosCleanUpDataCache(void *handle) {
SDataNode *pNode, *pNext;
pObj = (SCacheObj *)handle;
if (pObj == NULL || pObj->maxSessions <= 0) {
if (pObj == NULL || pObj->capacity <= 0) {
#if defined LINUX
pthread_rwlock_destroy(&pObj->lock);
#else
......@@ -813,8 +883,8 @@ void taosCleanUpDataCache(void *handle) {
pthread_mutex_lock(&pObj->mutex);
#endif
if (pObj->hashList && pObj->total > 0) {
for (int i = 0; i < pObj->maxSessions; ++i) {
if (pObj->hashList && pObj->size > 0) {
for (int i = 0; i < pObj->capacity; ++i) {
pNode = pObj->hashList[i];
while (pNode) {
pNext = pNode->next;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册