提交 285d732e 编写于 作者: H Haojun Liao

[td-225] update the hash func

上级 97616d2a
...@@ -20,7 +20,9 @@ ...@@ -20,7 +20,9 @@
extern "C" { extern "C" {
#endif #endif
#include "tarray.h"
#include "hashfunc.h" #include "hashfunc.h"
#include "tlockfree.h"
#define HASH_MAX_CAPACITY (1024 * 1024 * 16) #define HASH_MAX_CAPACITY (1024 * 1024 * 16)
#define HASH_DEFAULT_LOAD_FACTOR (0.75) #define HASH_DEFAULT_LOAD_FACTOR (0.75)
...@@ -37,23 +39,41 @@ typedef struct SHashNode { ...@@ -37,23 +39,41 @@ typedef struct SHashNode {
char *data; char *data;
} SHashNode; } SHashNode;
typedef enum SHashLockTypeE {
HASH_NO_LOCK = 0,
HASH_GLOBAL_LOCK = 1,
HASH_ENTRY_LOCK = 2,
} SHashLockTypeE;
typedef struct SHashLock {
#if defined(LINUX)
pthread_rwlock_t *lock;
#else
pthread_mutex_t *lock;
#endif
} SHashLock;
typedef struct SHashEntry {
int32_t num; // number of elements in current entry
SRWLatch latch; // entry latch
SHashNode head; // dummy head
} SHashEntry;
typedef struct SHashObj { typedef struct SHashObj {
SHashNode **hashList; SHashEntry **hashList;
size_t capacity; // number of slots size_t capacity; // number of slots
size_t size; // number of elements in hash table size_t size; // number of elements in hash table
_hash_fn_t hashFp; // hash function _hash_fn_t hashFp; // hash function
_hash_free_fn_t freeFp; // hash node free callback function _hash_free_fn_t freeFp; // hash node free callback function
SHashLock lock;
SHashLockTypeE lockType; // lock type
bool enableUpdate; // enable update bool enableUpdate; // enable update
#if defined(LINUX) SArray *pMemBlock; // memory block allocated for SHashEntry
pthread_rwlock_t *lock;
#else
pthread_mutex_t *lock;
#endif
} SHashObj; } SHashObj;
typedef struct SHashMutableIterator { typedef struct SHashMutableIterator {
SHashObj * pHashObj; SHashObj *pHashObj;
int32_t entryIndex; int32_t entryIndex;
SHashNode *pCur; SHashNode *pCur;
SHashNode *pNext; // current node can be deleted for mutable iterator, so keep the next one before return current SHashNode *pNext; // current node can be deleted for mutable iterator, so keep the next one before return current
...@@ -68,7 +88,7 @@ typedef struct SHashMutableIterator { ...@@ -68,7 +88,7 @@ typedef struct SHashMutableIterator {
* @param threadsafe thread safe or not * @param threadsafe thread safe or not
* @return * @return
*/ */
SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, bool threadsafe); SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTypeE type);
/** /**
* return the size of hash table * return the size of hash table
...@@ -107,7 +127,7 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen); ...@@ -107,7 +127,7 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen);
int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen); int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen);
void* taosHashRemoveNode(SHashObj *pHashObj, const void *key, size_t keyLen); int32_t taosHashRemoveNode(SHashObj *pHashObj, const void *key, size_t keyLen, void* data, size_t dsize);
/** /**
* clean up hash table * clean up hash table
...@@ -115,13 +135,6 @@ void* taosHashRemoveNode(SHashObj *pHashObj, const void *key, size_t keyLen); ...@@ -115,13 +135,6 @@ void* taosHashRemoveNode(SHashObj *pHashObj, const void *key, size_t keyLen);
*/ */
void taosHashCleanup(SHashObj *pHashObj); void taosHashCleanup(SHashObj *pHashObj);
/**
* Set the free callback function
* This function if set will be invoked right before freeing each hash node
* @param pHashObj
*/
void taosHashSetFreecb(SHashObj *pHashObj, _hash_free_fn_t freeFp);
/** /**
* *
* @param pHashObj * @param pHashObj
......
...@@ -100,7 +100,19 @@ static FORCE_INLINE int32_t taosHashCapacity(int32_t length) { ...@@ -100,7 +100,19 @@ static FORCE_INLINE int32_t taosHashCapacity(int32_t length) {
FORCE_INLINE SHashNode *doGetNodeFromHashTable(SHashObj *pHashObj, const void *key, uint32_t keyLen, uint32_t hashVal) { FORCE_INLINE SHashNode *doGetNodeFromHashTable(SHashObj *pHashObj, const void *key, uint32_t keyLen, uint32_t hashVal) {
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHashNode *pNode = pHashObj->hashList[slot]; SHashEntry *pe = pHashObj->hashList[slot];
// no data, return directly
int32_t num = atomic_load_32(&pe->num);
if (num == 0) {
return NULL;
}
if (pHashObj->lockType == HASH_ENTRY_LOCK) {
taosRLockLatch(&pe->latch);
}
SHashNode* pNode = pe->head.next;
while (pNode) { while (pNode) {
if ((pNode->keyLen == keyLen) && (memcmp(pNode->key, key, keyLen) == 0)) { if ((pNode->keyLen == keyLen) && (memcmp(pNode->key, key, keyLen) == 0)) {
assert(pNode->hashVal == hashVal); assert(pNode->hashVal == hashVal);
...@@ -110,6 +122,10 @@ FORCE_INLINE SHashNode *doGetNodeFromHashTable(SHashObj *pHashObj, const void *k ...@@ -110,6 +122,10 @@ FORCE_INLINE SHashNode *doGetNodeFromHashTable(SHashObj *pHashObj, const void *k
pNode = pNode->next; pNode = pNode->next;
} }
if (pHashObj->lockType == HASH_ENTRY_LOCK) {
taosRUnLockLatch(&pe->latch);
}
return pNode; return pNode;
} }
...@@ -154,7 +170,7 @@ static FORCE_INLINE SHashNode *doUpdateHashNode(SHashNode *pNode, SHashNode *pNe ...@@ -154,7 +170,7 @@ static FORCE_INLINE SHashNode *doUpdateHashNode(SHashNode *pNode, SHashNode *pNe
* @param pHashObj * @param pHashObj
* @param pNode * @param pNode
*/ */
static void doAddToHashTable(SHashObj *pHashObj, SHashNode *pNode); static void pushfrontNode(SHashEntry* pEntry, SHashNode *pNode);
/** /**
* Get the next element in hash table for iterator * Get the next element in hash table for iterator
...@@ -163,7 +179,7 @@ static void doAddToHashTable(SHashObj *pHashObj, SHashNode *pNode); ...@@ -163,7 +179,7 @@ static void doAddToHashTable(SHashObj *pHashObj, SHashNode *pNode);
*/ */
static SHashNode *getNextHashNode(SHashMutableIterator *pIter); static SHashNode *getNextHashNode(SHashMutableIterator *pIter);
SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, bool threadsafe) { SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTypeE type) {
if (capacity == 0 || fn == NULL) { if (capacity == 0 || fn == NULL) {
return NULL; return NULL;
} }
...@@ -179,24 +195,35 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, bool threads ...@@ -179,24 +195,35 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, bool threads
assert((pHashObj->capacity & (pHashObj->capacity - 1)) == 0); assert((pHashObj->capacity & (pHashObj->capacity - 1)) == 0);
pHashObj->hashFp = fn; pHashObj->hashFp = fn;
pHashObj->lockType = type;
pHashObj->enableUpdate = update; pHashObj->enableUpdate = update;
pHashObj->hashList = (SHashNode **)calloc(pHashObj->capacity, POINTER_BYTES); pHashObj->hashList = (SHashEntry **)calloc(pHashObj->capacity, sizeof(void*));
if (pHashObj->hashList == NULL) { if (pHashObj->hashList == NULL) {
free(pHashObj); free(pHashObj);
uError("failed to allocate memory, reason:%s", strerror(errno)); uError("failed to allocate memory, reason:%s", strerror(errno));
return NULL; return NULL;
} else {
pHashObj->pMemBlock = taosArrayInit(8, sizeof(void*));
void* p = calloc(pHashObj->capacity, sizeof(SHashEntry));
for(int32_t i = 0; i < pHashObj->capacity; ++i) {
pHashObj->hashList[i] = p + i * sizeof(SHashEntry);
}
taosArrayPush(pHashObj->pMemBlock, &p);
} }
if (threadsafe) { if (pHashObj->lockType != HASH_NO_LOCK) {
#if defined(LINUX) #if defined(LINUX)
pHashObj->lock = calloc(1, sizeof(pthread_rwlock_t)); pHashObj->lock.lock = calloc(1, sizeof(pthread_rwlock_t));
#else #else
pHashObj->lock = calloc(1, sizeof(pthread_mutex_t)); pHashObj->lock.lock = calloc(1, sizeof(pthread_mutex_t));
#endif #endif
} }
if (__lock_init(pHashObj->lock) != 0) { if (__lock_init(pHashObj->lock.lock) != 0) {
free(pHashObj->hashList); free(pHashObj->hashList);
free(pHashObj); free(pHashObj);
...@@ -208,11 +235,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, bool threads ...@@ -208,11 +235,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, bool threads
} }
size_t taosHashGetSize(const SHashObj *pHashObj) { size_t taosHashGetSize(const SHashObj *pHashObj) {
if (pHashObj == NULL) { return (pHashObj == NULL)? 0:pHashObj->size;
return 0;
}
return pHashObj->size;
} }
int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) { int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) {
...@@ -222,17 +245,43 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da ...@@ -222,17 +245,43 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
return -1; return -1;
} }
__wr_lock(pHashObj->lock); // need the resize process, write lock applied
SHashNode *pNode = doGetNodeFromHashTable(pHashObj, key, keyLen, hashVal);
if (pNode == NULL) { // no data in hash table with the specified key, add it into hash table
if (HASH_NEED_RESIZE(pHashObj)) { if (HASH_NEED_RESIZE(pHashObj)) {
__wr_lock(pHashObj->lock.lock);
taosHashTableResize(pHashObj); taosHashTableResize(pHashObj);
__unlock(pHashObj->lock.lock);
} }
doAddToHashTable(pHashObj, pNewNode); __rd_lock(pHashObj->lock.lock);
__unlock(pHashObj->lock);
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHashEntry *pe = pHashObj->hashList[slot];
if (pHashObj->lockType == HASH_ENTRY_LOCK) {
taosWLockLatch(&pe->latch);
}
SHashNode* pNode = pe->head.next;
while (pNode) {
if ((pNode->keyLen == keyLen) && (memcmp(pNode->key, key, keyLen) == 0)) {
assert(pNode->hashVal == hashVal);
break;
}
pNode = pNode->next;
}
if (pNode == NULL) {
// no data in hash table with the specified key, add it into hash table
pushfrontNode(pe, pNewNode);
if (pHashObj->lockType == HASH_ENTRY_LOCK) {
taosWUnLockLatch(&pe->latch);
}
// enable resize
__unlock(pHashObj->lock.lock);
atomic_add_fetch_64(&pHashObj->size, 1);
return 0; return 0;
} else { } else {
...@@ -241,7 +290,12 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da ...@@ -241,7 +290,12 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
doUpdateHashNode(pNode, pNewNode); doUpdateHashNode(pNode, pNewNode);
} }
__unlock(pHashObj->lock); if (pHashObj->lockType == HASH_ENTRY_LOCK) {
taosWUnLockLatch(&pe->latch);
}
// enable resize
__unlock(pHashObj->lock.lock);
tfree(pNewNode->data) tfree(pNewNode->data)
tfree(pNewNode); tfree(pNewNode);
...@@ -251,11 +305,18 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da ...@@ -251,11 +305,18 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da
} }
void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) { void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) {
if (pHashObj->size <= 0 || keyLen == 0 || key == NULL) {
return NULL;
}
uint32_t hashVal = (*pHashObj->hashFp)(key, keyLen); uint32_t hashVal = (*pHashObj->hashFp)(key, keyLen);
__rd_lock(pHashObj->lock); // only add the read lock to disable the resize process
__rd_lock(pHashObj->lock.lock);
SHashNode *pNode = doGetNodeFromHashTable(pHashObj, key, keyLen, hashVal); SHashNode *pNode = doGetNodeFromHashTable(pHashObj, key, keyLen, hashVal);
__unlock(pHashObj->lock);
__unlock(pHashObj->lock.lock);
if (pNode) { if (pNode) {
assert(pNode->hashVal == hashVal); assert(pNode->hashVal == hashVal);
...@@ -266,31 +327,59 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) { ...@@ -266,31 +327,59 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) {
} }
int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) { int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) {
if (pHashObj->size <= 0) {
return -1;
}
uint32_t hashVal = (*pHashObj->hashFp)(key, keyLen); uint32_t hashVal = (*pHashObj->hashFp)(key, keyLen);
__wr_lock(pHashObj->lock); // disable the resize process
SHashNode *pNode = doGetNodeFromHashTable(pHashObj, key, keyLen, hashVal); __rd_lock(pHashObj->lock.lock);
if (pNode == NULL) {
__unlock(pHashObj->lock); int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
SHashEntry *pe = pHashObj->hashList[slot];
// no data, return directly
if (pe->num == 0) {
__unlock(pHashObj->lock.lock);
return -1; return -1;
} }
SHashNode *pNext = pNode->next; if (pHashObj->lockType == HASH_ENTRY_LOCK) {
if (pNode->prev == NULL) { taosWLockLatch(&pe->latch);
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); }
assert(pHashObj->hashList[slot] == pNode);
pHashObj->hashList[slot] = pNext; SHashNode* pNode = pe->head.next;
} else { while (pNode) {
pNode->prev->next = pNext; if ((pNode->keyLen == keyLen) && (memcmp(pNode->key, key, keyLen) == 0)) {
assert(pNode->hashVal == hashVal);
break;
}
pNode = pNode->next;
} }
if (pNode != NULL) {
assert(pNode->prev != NULL);
SHashNode *pNext = pNode->next;
pNode->prev->next = pNext;
if (pNext != NULL) { if (pNext != NULL) {
pNext->prev = pNode->prev; pNext->prev = pNode->prev;
} }
pHashObj->size -= 1; pe->num -= 1;
__unlock(pHashObj->lock); }
if (pHashObj->lockType == HASH_ENTRY_LOCK) {
taosWUnLockLatch(&pe->latch);
}
__unlock(pHashObj->lock.lock);
if (pNode != NULL) {
atomic_sub_fetch_64(&pHashObj->size, 1);
pNode->next = NULL; pNode->next = NULL;
pNode->prev = NULL; pNode->prev = NULL;
...@@ -299,58 +388,106 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) { ...@@ -299,58 +388,106 @@ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) {
tfree(pNode); tfree(pNode);
return 0; return 0;
} else {
return -1;
}
} }
void* taosHashRemoveNode(SHashObj *pHashObj, const void *key, size_t keyLen) { int32_t taosHashRemoveNode(SHashObj *pHashObj, const void *key, size_t keyLen, void* data, size_t dsize) {
if (pHashObj->size <= 0) {
return -1;
}
uint32_t hashVal = (*pHashObj->hashFp)(key, keyLen); uint32_t hashVal = (*pHashObj->hashFp)(key, keyLen);
__wr_lock(pHashObj->lock); // disable the resize process
SHashNode *pNode = doGetNodeFromHashTable(pHashObj, key, keyLen, hashVal); __rd_lock(pHashObj->lock.lock);
if (pNode == NULL) {
__unlock(pHashObj->lock);
return NULL;
}
SHashNode *pNext = pNode->next;
if (pNode->prev == NULL) {
int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity);
assert(pHashObj->hashList[slot] == pNode); SHashEntry *pe = pHashObj->hashList[slot];
pHashObj->hashList[slot] = pNext; // no data, return directly
} else { if (pe->num == 0) {
pNode->prev->next = pNext; __unlock(pHashObj->lock.lock);
return -1;
}
if (pHashObj->lockType == HASH_ENTRY_LOCK) {
taosWLockLatch(&pe->latch);
} }
SHashNode* pNode = pe->head.next;
while (pNode) {
if ((pNode->keyLen == keyLen) && (memcmp(pNode->key, key, keyLen) == 0)) {
assert(pNode->hashVal == hashVal);
break;
}
pNode = pNode->next;
}
if (pNode != NULL) {
assert(pNode->prev != NULL);
SHashNode *pNext = pNode->next;
pNode->prev->next = pNext;
if (pNext != NULL) { if (pNext != NULL) {
pNext->prev = pNode->prev; pNext->prev = pNode->prev;
} }
}
pHashObj->size -= 1; if (pHashObj->lockType == HASH_ENTRY_LOCK) {
__unlock(pHashObj->lock); pe->num -= 1;
taosWUnLockLatch(&pe->latch);
}
__unlock(pHashObj->lock.lock);
atomic_sub_fetch_64(&pHashObj->size, 1);
if (data != NULL) {
memcpy(data, pNode->data, dsize);
}
if (pNode != NULL) {
pNode->next = NULL; pNode->next = NULL;
pNode->prev = NULL; pNode->prev = NULL;
return pNode; tfree(pNode->data);
tfree(pNode);
return 0;
} else {
return -1;
}
} }
void taosHashCleanup(SHashObj *pHashObj) { void taosHashCleanup(SHashObj *pHashObj) {
if (pHashObj == NULL) return; if (pHashObj == NULL) {
return;
}
SHashNode *pNode, *pNext; SHashNode *pNode, *pNext;
__wr_lock(pHashObj->lock); __wr_lock(pHashObj->lock.lock);
if (pHashObj->hashList) { if (pHashObj->hashList) {
for (int32_t i = 0; i < pHashObj->capacity; ++i) { for (int32_t i = 0; i < pHashObj->capacity; ++i) {
pNode = pHashObj->hashList[i]; SHashEntry* pEntry = pHashObj->hashList[i];
if (pEntry->num == 0) {
assert(pEntry->head.next == 0);
continue;
}
pNode = pEntry->head.next;
while (pNode) { while (pNode) {
pNext = pNode->next; pNext = pNode->next;
if (pHashObj->freeFp) { if (pHashObj->freeFp) {
pHashObj->freeFp(pNode->data); pHashObj->freeFp(pNode->data);
} }
free(pNode->data);
free(pNode); free(pNode);
pNode = pNext; pNode = pNext;
} }
...@@ -359,10 +496,20 @@ void taosHashCleanup(SHashObj *pHashObj) { ...@@ -359,10 +496,20 @@ void taosHashCleanup(SHashObj *pHashObj) {
free(pHashObj->hashList); free(pHashObj->hashList);
} }
__unlock(pHashObj->lock); __unlock(pHashObj->lock.lock);
__lock_destroy(pHashObj->lock); __lock_destroy(pHashObj->lock.lock);
tfree(pHashObj->lock.lock);
// destroy mem block
size_t memBlock = taosArrayGetSize(pHashObj->pMemBlock);
for(int32_t i = 0; i < memBlock; ++i) {
void* p = taosArrayGetP(pHashObj->pMemBlock, i);
tfree(p);
}
taosArrayDestroy(pHashObj->pMemBlock);
tfree(pHashObj->lock);
memset(pHashObj, 0, sizeof(SHashObj)); memset(pHashObj, 0, sizeof(SHashObj));
free(pHashObj); free(pHashObj);
} }
...@@ -392,13 +539,13 @@ bool taosHashIterNext(SHashMutableIterator *pIter) { ...@@ -392,13 +539,13 @@ bool taosHashIterNext(SHashMutableIterator *pIter) {
assert(pIter->pCur == NULL && pIter->pNext == NULL); assert(pIter->pCur == NULL && pIter->pNext == NULL);
while (1) { while (1) {
SHashNode *pEntry = pIter->pHashObj->hashList[pIter->entryIndex]; SHashEntry *pEntry = pIter->pHashObj->hashList[pIter->entryIndex];
if (pEntry == NULL) { if (pEntry->num == 0) {
pIter->entryIndex++; pIter->entryIndex++;
continue; continue;
} }
pIter->pCur = pEntry; pIter->pCur = pEntry->head.next;
if (pIter->pCur->next) { if (pIter->pCur->next) {
pIter->pNext = pIter->pCur->next; pIter->pNext = pIter->pCur->next;
...@@ -451,19 +598,9 @@ int32_t taosHashGetMaxOverflowLinkLength(const SHashObj *pHashObj) { ...@@ -451,19 +598,9 @@ int32_t taosHashGetMaxOverflowLinkLength(const SHashObj *pHashObj) {
int32_t num = 0; int32_t num = 0;
for (int32_t i = 0; i < pHashObj->size; ++i) { for (int32_t i = 0; i < pHashObj->size; ++i) {
SHashNode *pEntry = pHashObj->hashList[i]; SHashEntry *pEntry = pHashObj->hashList[i];
if (pEntry == NULL) { if (num < pEntry->num) {
continue; num = pEntry->num;
}
int32_t j = 0;
while(pEntry != NULL) {
pEntry = pEntry->next;
j++;
}
if (num < j) {
num = j;
} }
} }
...@@ -471,7 +608,7 @@ int32_t taosHashGetMaxOverflowLinkLength(const SHashObj *pHashObj) { ...@@ -471,7 +608,7 @@ int32_t taosHashGetMaxOverflowLinkLength(const SHashObj *pHashObj) {
} }
void taosHashTableResize(SHashObj *pHashObj) { void taosHashTableResize(SHashObj *pHashObj) {
if (pHashObj->size < pHashObj->capacity * HASH_DEFAULT_LOAD_FACTOR) { if (!HASH_NEED_RESIZE(pHashObj)) {
return; return;
} }
...@@ -486,37 +623,43 @@ void taosHashTableResize(SHashObj *pHashObj) { ...@@ -486,37 +623,43 @@ void taosHashTableResize(SHashObj *pHashObj) {
return; return;
} }
int32_t pointerSize = POINTER_BYTES; void *pNewEntryList = realloc(pHashObj->hashList, sizeof(SHashEntry) * newSize);
void *pNewEntry = realloc(pHashObj->hashList, pointerSize * newSize); if (pNewEntryList == NULL) {// todo handle error
if (pNewEntry == NULL) {// todo handle error
// uDebug("cache resize failed due to out of memory, capacity remain:%d", pHashObj->capacity); // uDebug("cache resize failed due to out of memory, capacity remain:%d", pHashObj->capacity);
return; return;
} }
pHashObj->hashList = pNewEntry; pHashObj->hashList = pNewEntryList;
memset(&pHashObj->hashList[pHashObj->capacity], 0, POINTER_BYTES * (newSize - pHashObj->capacity));
pHashObj->capacity = newSize; size_t inc = newSize - pHashObj->capacity;
void* p = calloc(inc, sizeof(SHashEntry));
for(int32_t i = 0; i < inc; ++i) {
pHashObj->hashList[i + pHashObj->capacity] = p + i * sizeof(SHashEntry);
}
taosArrayPush(pHashObj->pMemBlock, &p);
pHashObj->capacity = newSize;
for (int32_t i = 0; i < pHashObj->capacity; ++i) { for (int32_t i = 0; i < pHashObj->capacity; ++i) {
pNode = pHashObj->hashList[i]; SHashEntry* pe = pHashObj->hashList[i];
if (pNode != NULL) { if (pe->num == 0) {
assert(pNode->prev == NULL); assert(pe->head.next == NULL);
continue;
} }
pNode = pe->head.next;
while (pNode) { while (pNode) {
int32_t j = HASH_INDEX(pNode->hashVal, pHashObj->capacity); int32_t j = HASH_INDEX(pNode->hashVal, pHashObj->capacity);
if (j == i) { // this key locates in the same slot, no need to relocate it if (j == i) { // this key locates in the same slot, no need to relocate it
pNode = pNode->next; pNode = pNode->next;
assert(pNode == NULL || pNode->next != pNode);
} else { } else {
pNext = pNode->next; pNext = pNode->next;
assert(pNode != pNext && (pNext == NULL || pNext->prev == pNode) && pNode->prev->next == pNode);
if (pNode->prev == NULL) { // first node of the overflow linked list assert(pNode->prev != NULL);
pHashObj->hashList[i] = pNext;
} else {
pNode->prev->next = pNext; pNode->prev->next = pNext;
}
if (pNext != NULL) { if (pNext != NULL) {
pNext->prev = pNode->prev; pNext->prev = pNode->prev;
} }
...@@ -524,16 +667,11 @@ void taosHashTableResize(SHashObj *pHashObj) { ...@@ -524,16 +667,11 @@ void taosHashTableResize(SHashObj *pHashObj) {
// clear pointer // clear pointer
pNode->next = NULL; pNode->next = NULL;
pNode->prev = NULL; pNode->prev = NULL;
pe->num -= 1;
// added into new slot // added into new slot
SHashNode *pNew = pHashObj->hashList[j]; SHashEntry *pNewEntry = pHashObj->hashList[j];
if (pNew != NULL) { pushfrontNode(pNewEntry, pNode);
assert(pNew->prev == NULL);
pNew->prev = pNode;
}
pNode->next = pNew;
pHashObj->hashList[j] = pNode;
// continue // continue
pNode = pNext; pNode = pNext;
...@@ -563,21 +701,19 @@ SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, s ...@@ -563,21 +701,19 @@ SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, s
return pNewNode; return pNewNode;
} }
void doAddToHashTable(SHashObj *pHashObj, SHashNode *pNode) { void pushfrontNode(SHashEntry* pEntry, SHashNode *pNode) {
assert(pNode != NULL); assert(pNode != NULL && pEntry != NULL);
int32_t index = HASH_INDEX(pNode->hashVal, pHashObj->capacity);
SHashNode* pEntry = pHashObj->hashList[index]; SHashNode* pNext = pEntry->head.next;
if (pEntry != NULL) { if (pNext != NULL) {
pEntry->prev = pNode; pNext->prev = pNode;
pNode->next = pEntry;
pNode->prev = NULL;
} }
pHashObj->hashList[index] = pNode; pNode->next = pNext;
pHashObj->size++; pNode->prev = &pEntry->head;
pEntry->head.next = pNode;
pEntry->num += 1;
} }
SHashNode *getNextHashNode(SHashMutableIterator *pIter) { SHashNode *getNextHashNode(SHashMutableIterator *pIter) {
...@@ -585,13 +721,13 @@ SHashNode *getNextHashNode(SHashMutableIterator *pIter) { ...@@ -585,13 +721,13 @@ SHashNode *getNextHashNode(SHashMutableIterator *pIter) {
pIter->entryIndex++; pIter->entryIndex++;
while (pIter->entryIndex < pIter->pHashObj->capacity) { while (pIter->entryIndex < pIter->pHashObj->capacity) {
SHashNode *pNode = pIter->pHashObj->hashList[pIter->entryIndex]; SHashEntry*pEntry = pIter->pHashObj->hashList[pIter->entryIndex];
if (pNode == NULL) { if (pEntry->num == 0) {
pIter->entryIndex++; pIter->entryIndex++;
continue; continue;
} }
return pNode; return pEntry->head.next;
} }
return NULL; return NULL;
......
...@@ -207,7 +207,7 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext ...@@ -207,7 +207,7 @@ SCacheObj *taosCacheInit(int32_t keyType, int64_t refreshTimeInSeconds, bool ext
return NULL; return NULL;
} }
pCacheObj->pHashTable = taosHashInit(128, taosGetDefaultHashFunction(keyType), false, true); pCacheObj->pHashTable = taosHashInit(128, taosGetDefaultHashFunction(keyType), false, HASH_ENTRY_LOCK);
pCacheObj->name = strdup(cacheName); pCacheObj->name = strdup(cacheName);
if (pCacheObj->pHashTable == NULL) { if (pCacheObj->pHashTable == NULL) {
free(pCacheObj); free(pCacheObj);
...@@ -249,7 +249,6 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v ...@@ -249,7 +249,6 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v
return NULL; return NULL;
} }
// __cache_wr_lock(pCacheObj);
T_REF_INC(pNode1); T_REF_INC(pNode1);
int32_t succ = taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode1, sizeof(void *)); int32_t succ = taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode1, sizeof(void *));
...@@ -261,23 +260,27 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v ...@@ -261,23 +260,27 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v
(int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, (int64_t)dataSize); (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, (int64_t)dataSize);
} else { // duplicated key exists } else { // duplicated key exists
while (1) { while (1) {
// todo removed by node, instead of by key SCacheDataNode* p = NULL;
SHashNode *p = taosHashRemoveNode(pCacheObj->pHashTable, key, keyLen); int32_t ret = taosHashRemoveNode(pCacheObj->pHashTable, key, keyLen, (void*) &p, sizeof(void*));
// add to trashcan // add to trashcan
if (p != NULL) { if (ret == 0) {
SCacheDataNode* pCachedNode = *(SCacheDataNode**)p->data; if (T_REF_VAL_GET(p) == 0) {
if (T_REF_VAL_GET(pCachedNode) == 0) {
tfree(pCachedNode); if (pCacheObj->freeFp) {
pCacheObj->freeFp(p->data);
}
tfree(p);
} else { } else {
taosAddToTrash(pCacheObj, pCachedNode); taosAddToTrash(pCacheObj, p);
uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode1->data, pCachedNode); uDebug("cache:%s, key:%p, %p exist in cache, updated old:%p", pCacheObj->name, key, pNode1->data, p);
} }
} }
assert(T_REF_VAL_GET(pNode1) == 1); assert(T_REF_VAL_GET(pNode1) == 1);
int32_t ret = taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode1, sizeof(void *)); ret = taosHashPut(pCacheObj->pHashTable, key, keyLen, &pNode1, sizeof(void *));
if (ret == 0) { if (ret == 0) {
atomic_add_fetch_64(&pCacheObj->totalSize, pNode1->size); atomic_add_fetch_64(&pCacheObj->totalSize, pNode1->size);
...@@ -430,14 +433,16 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { ...@@ -430,14 +433,16 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
* NOTE: previous ref is 0, and current ref is still 0, remove it. If previous is not 0, there is another thread * NOTE: previous ref is 0, and current ref is still 0, remove it. If previous is not 0, there is another thread
* that tries to do the same thing. * that tries to do the same thing.
*/ */
if (pNode->inTrashCan) { if (inTrashCan) {
if (ref == 0) { if (ref == 0) {
assert(pNode->pTNodeHeader->pData == pNode); assert(pNode->pTNodeHeader->pData == pNode);
// todo add lock here
taosRemoveFromTrashCan(pCacheObj, pNode->pTNodeHeader); taosRemoveFromTrashCan(pCacheObj, pNode->pTNodeHeader);
} }
} else { } else {
int32_t ret = taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize); int32_t ret = taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize);
if (ret == 0) { if (ret == 0) { // successfully remove from hash table
if (ref > 0) { if (ref > 0) {
assert(pNode->pTNodeHeader == NULL); assert(pNode->pTNodeHeader == NULL);
...@@ -459,6 +464,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { ...@@ -459,6 +464,7 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) {
// NOTE: once refcount is decrease, pNode may be freed by other thread immediately. // NOTE: once refcount is decrease, pNode may be freed by other thread immediately.
int32_t ref = T_REF_DEC(pNode); int32_t ref = T_REF_DEC(pNode);
// todo so, invalid read here!
uDebug("cache:%s, key:%p, %p released, refcnt:%d, data in trancan:%d", pCacheObj->name, pNode->key, pNode->data, uDebug("cache:%s, key:%p, %p released, refcnt:%d, data in trancan:%d", pCacheObj->name, pNode->key, pNode->data,
ref, inTrashCan); ref, inTrashCan);
} }
......
...@@ -10,7 +10,7 @@ ...@@ -10,7 +10,7 @@
namespace { namespace {
// the simple test code for basic operations // the simple test code for basic operations
void simpleTest() { void simpleTest() {
auto* hashTable = (SHashObj*) taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false); SHashObj* hashTable = (SHashObj*) taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
ASSERT_EQ(taosHashGetSize(hashTable), 0); ASSERT_EQ(taosHashGetSize(hashTable), 0);
// put 400 elements in the hash table // put 400 elements in the hash table
...@@ -47,7 +47,7 @@ void simpleTest() { ...@@ -47,7 +47,7 @@ void simpleTest() {
} }
void stringKeyTest() { void stringKeyTest() {
auto* hashTable = (SHashObj*) taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false); auto* hashTable = (SHashObj*) taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
ASSERT_EQ(taosHashGetSize(hashTable), 0); ASSERT_EQ(taosHashGetSize(hashTable), 0);
char key[128] = {0}; char key[128] = {0};
...@@ -97,7 +97,7 @@ void functionTest() { ...@@ -97,7 +97,7 @@ void functionTest() {
* a single threads situation * a single threads situation
*/ */
void noLockPerformanceTest() { void noLockPerformanceTest() {
auto* hashTable = (SHashObj*) taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false); auto* hashTable = (SHashObj*) taosHashInit(4096, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
ASSERT_EQ(taosHashGetSize(hashTable), 0); ASSERT_EQ(taosHashGetSize(hashTable), 0);
char key[128] = {0}; char key[128] = {0};
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册