diff --git a/src/util/inc/hash.h b/src/util/inc/hash.h index e22716241d6ffdc554f6f9381743c9eca4ed6776..688bf317d6860fe9475656b3e59ea5ff109488b2 100644 --- a/src/util/inc/hash.h +++ b/src/util/inc/hash.h @@ -76,8 +76,9 @@ typedef struct SHashMutableIterator { SHashObj *pHashObj; int32_t entryIndex; SHashNode *pCur; - SHashNode *pNext; // current node can be deleted for mutable iterator, so keep the next one before return current - int32_t num; // already check number of elements in hash table + SHashNode *pNext; // current node can be deleted for mutable iterator, so keep the next one before return current + size_t numOfChecked; // already check number of elements in hash table + size_t numOfEntries; // number of entries while the iterator is created } SHashMutableIterator; /** @@ -118,6 +119,8 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da */ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen); +void *taosHashGetCB(SHashObj *pHashObj, const void *key, size_t keyLen, void(*fp)(void*)); + /** * remove item with the specified key * @param pHashObj @@ -126,8 +129,9 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen); */ int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen); +int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLen, void* data, size_t dsize); -int32_t taosHashRemoveNode(SHashObj *pHashObj, const void *key, size_t keyLen, void* data, size_t dsize); +int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), void *param); /** * clean up hash table diff --git a/src/util/src/hash.c b/src/util/src/hash.c index 171c8ec100597d2d708fe4324e4546260e379425..be4baf85d2695fc4879545e7995937215777c071 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -19,7 +19,7 @@ #include "tulog.h" #include "tutil.h" -#define HASH_NEED_RESIZE(_h) ((_h)->size >= (_h)->capacity * HASH_DEFAULT_LOAD_FACTOR) +#define HASH_NEED_RESIZE(_h) ((_h)->size >= (_h)->capacity * HASH_DEFAULT_LOAD_FACTOR) #define FREE_HASH_NODE(_n) \ do { \ @@ -75,8 +75,8 @@ static FORCE_INLINE int32_t taosHashCapacity(int32_t length) { * @return */ -static FORCE_INLINE SHashNode* doSearchEntryList(SHashEntry* pe, const void* key, size_t keyLen, uint32_t hashVal) { - SHashNode* pNode = pe->head.next; +static FORCE_INLINE SHashNode *doSearchInEntryList(SHashEntry *pe, const void *key, size_t keyLen, uint32_t hashVal) { + SHashNode *pNode = pe->head.next; while (pNode) { if ((pNode->keyLen == keyLen) && (memcmp(pNode->key, key, keyLen) == 0)) { assert(pNode->hashVal == hashVal); @@ -89,7 +89,8 @@ static FORCE_INLINE SHashNode* doSearchEntryList(SHashEntry* pe, const void* key return pNode; } -static FORCE_INLINE SHashNode *doGetNodeFromHashTable(SHashObj *pHashObj, const void *key, uint32_t keyLen, uint32_t hashVal) { +static FORCE_INLINE SHashNode *doGetNodeFromHashTable(SHashObj *pHashObj, const void *key, uint32_t keyLen, + uint32_t hashVal) { int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); SHashEntry *pe = pHashObj->hashList[slot]; @@ -103,7 +104,7 @@ static FORCE_INLINE SHashNode *doGetNodeFromHashTable(SHashObj *pHashObj, const taosRLockLatch(&pe->latch); } - SHashNode* pNode = doSearchEntryList(pe, key, keyLen, hashVal); + SHashNode *pNode = doSearchInEntryList(pe, key, keyLen, hashVal); if (pHashObj->type == HASH_ENTRY_LOCK) { taosRUnLockLatch(&pe->latch); @@ -141,8 +142,8 @@ static SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *p */ static FORCE_INLINE SHashNode *doUpdateHashNode(SHashNode *pNode, SHashNode *pNewNode) { assert(pNode->keyLen == pNewNode->keyLen); - SWAP(pNode->key, pNewNode->key, void*); - SWAP(pNode->data, pNewNode->data, void*); + SWAP(pNode->key, pNewNode->key, void *); + SWAP(pNode->data, pNewNode->data, void *); return pNewNode; } @@ -153,7 +154,7 @@ static FORCE_INLINE SHashNode *doUpdateHashNode(SHashNode *pNode, SHashNode *pNe * @param pHashObj * @param pNode */ -static void pushfrontNode(SHashEntry* pEntry, SHashNode *pNode); +static void pushfrontNode(SHashEntry *pEntry, SHashNode *pNode); /** * Get the next element in hash table for iterator @@ -181,17 +182,16 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp pHashObj->type = type; pHashObj->enableUpdate = update; - pHashObj->hashList = (SHashEntry **)calloc(pHashObj->capacity, sizeof(void*)); + pHashObj->hashList = (SHashEntry **)calloc(pHashObj->capacity, sizeof(void *)); if (pHashObj->hashList == NULL) { free(pHashObj); uError("failed to allocate memory, reason:%s", strerror(errno)); return NULL; } else { + pHashObj->pMemBlock = taosArrayInit(8, sizeof(void *)); - pHashObj->pMemBlock = taosArrayInit(8, sizeof(void*)); - - void* p = calloc(pHashObj->capacity, sizeof(SHashEntry)); - for(int32_t i = 0; i < pHashObj->capacity; ++i) { + void *p = calloc(pHashObj->capacity, sizeof(SHashEntry)); + for (int32_t i = 0; i < pHashObj->capacity; ++i) { pHashObj->hashList[i] = p + i * sizeof(SHashEntry); } @@ -201,9 +201,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool update, SHashLockTyp return pHashObj; } -size_t taosHashGetSize(const SHashObj *pHashObj) { - return (pHashObj == NULL)? 0:pHashObj->size; -} +size_t taosHashGetSize(const SHashObj *pHashObj) { return (pHashObj == NULL) ? 0 : pHashObj->size; } int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t size) { uint32_t hashVal = (*pHashObj->hashFp)(key, keyLen); @@ -221,14 +219,14 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da __rd_lock(&pHashObj->lock, pHashObj->type); - int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); + int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); SHashEntry *pe = pHashObj->hashList[slot]; if (pHashObj->type == HASH_ENTRY_LOCK) { taosWLockLatch(&pe->latch); } - SHashNode* pNode = pe->head.next; + SHashNode *pNode = pe->head.next; while (pNode) { if ((pNode->keyLen == keyLen) && (memcmp(pNode->key, key, keyLen) == 0)) { assert(pNode->hashVal == hashVal); @@ -265,11 +263,15 @@ int32_t taosHashPut(SHashObj *pHashObj, const void *key, size_t keyLen, void *da __rd_unlock(&pHashObj->lock, pHashObj->type); FREE_HASH_NODE(pNewNode); - return pHashObj->enableUpdate? 0:-1; + return pHashObj->enableUpdate ? 0 : -1; } } void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) { + return taosHashGetCB(pHashObj, key, keyLen, NULL); +} + +void *taosHashGetCB(SHashObj *pHashObj, const void *key, size_t keyLen, void (*fp)(void *)) { if (pHashObj->size <= 0 || keyLen == 0 || key == NULL) { return NULL; } @@ -279,24 +281,42 @@ void *taosHashGet(SHashObj *pHashObj, const void *key, size_t keyLen) { // only add the read lock to disable the resize process __rd_lock(&pHashObj->lock, pHashObj->type); - SHashNode *pNode = doGetNodeFromHashTable(pHashObj, key, keyLen, hashVal); - - __rd_unlock(&pHashObj->lock, pHashObj->type); + int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); + SHashEntry *pe = pHashObj->hashList[slot]; - if (pNode) { - assert(pNode->hashVal == hashVal); - return pNode->data; - } else { + // no data, return directly + if (atomic_load_32(&pe->num) == 0) { + __rd_unlock(&pHashObj->lock, pHashObj->type); return NULL; } + + char *data = NULL; + + // lock entry + if (pHashObj->type == HASH_ENTRY_LOCK) { + taosRLockLatch(&pe->latch); + } + + SHashNode *pNode = doSearchInEntryList(pe, key, keyLen, hashVal); + if (fp != NULL) { + fp(pNode->data); + } + + data = pNode->data; + if (pHashObj->type == HASH_ENTRY_LOCK) { + taosRUnLockLatch(&pe->latch); + } + + __rd_unlock(&pHashObj->lock, pHashObj->type); + return data; } int32_t taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) { - return taosHashRemoveNode(pHashObj, key, keyLen, NULL, 0); + return taosHashRemoveWithData(pHashObj, key, keyLen, NULL, 0); } -static FORCE_INLINE void popNodeFromEntryList(SHashEntry* pe, SHashNode* pNode) { - SHashNode* pNext = pNode->next; +static FORCE_INLINE void doPopFromEntryList(SHashEntry *pe, SHashNode *pNode) { + SHashNode *pNext = pNode->next; assert(pNode->prev != NULL); pNode->prev->next = pNext; @@ -307,17 +327,17 @@ static FORCE_INLINE void popNodeFromEntryList(SHashEntry* pe, SHashNode* pNode) pe->num -= 1; } -int32_t taosHashRemoveNode(SHashObj *pHashObj, const void *key, size_t keyLen, void* data, size_t dsize) { - if (pHashObj->size <= 0) { +int32_t taosHashRemoveWithData(SHashObj *pHashObj, const void *key, size_t keyLen, void *data, size_t dsize) { + if (pHashObj == NULL || pHashObj->size <= 0) { return -1; } uint32_t hashVal = (*pHashObj->hashFp)(key, keyLen); // disable the resize process - __rd_lock(&pHashObj->lock, pHashObj->type); + __rd_lock(&pHashObj->lock, pHashObj->type); - int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); + int32_t slot = HASH_INDEX(hashVal, pHashObj->capacity); SHashEntry *pe = pHashObj->hashList[slot]; // no data, return directly @@ -330,9 +350,9 @@ int32_t taosHashRemoveNode(SHashObj *pHashObj, const void *key, size_t keyLen, v taosWLockLatch(&pe->latch); } - SHashNode* pNode = doSearchEntryList(pe, key, keyLen, hashVal); + SHashNode *pNode = doSearchInEntryList(pe, key, keyLen, hashVal); if (pNode != NULL) { - popNodeFromEntryList(pe, pNode); + doPopFromEntryList(pe, pNode); } if (pHashObj->type == HASH_ENTRY_LOCK) { @@ -341,13 +361,13 @@ int32_t taosHashRemoveNode(SHashObj *pHashObj, const void *key, size_t keyLen, v __rd_unlock(&pHashObj->lock, pHashObj->type); - atomic_sub_fetch_64(&pHashObj->size, 1); - if (data != NULL) { memcpy(data, pNode->data, dsize); } if (pNode != NULL) { + atomic_sub_fetch_64(&pHashObj->size, 1); + pNode->next = NULL; pNode->prev = NULL; @@ -359,6 +379,49 @@ int32_t taosHashRemoveNode(SHashObj *pHashObj, const void *key, size_t keyLen, v } } +int32_t taosHashCondTraverse(SHashObj *pHashObj, bool (*fp)(void *, void *), void *param) { + if (pHashObj == NULL || pHashObj->size == 0) { + return 0; + } + + // disable the resize process + __rd_lock(&pHashObj->lock, pHashObj->type); + + int32_t numOfEntries = pHashObj->capacity; + for (int32_t i = 0; i < numOfEntries; ++i) { + SHashEntry *pEntry = pHashObj->hashList[i]; + if (pEntry->num <= 0) { + continue; + } + + if (pHashObj->type == HASH_ENTRY_LOCK) { + taosWLockLatch(&pEntry->latch); + } + + SHashNode *pNode = pEntry->head.next; + assert(pNode != NULL); + + SHashNode *pNext = NULL; + while (pNode != NULL) { + pNext = pNode->next; + + // not qualified, remove it + if (fp && (!fp(param, pNode->data))) { + doPopFromEntryList(pEntry, pNode); + } + + pNode = pNext; + } + + if (pHashObj->type == HASH_ENTRY_LOCK) { + taosWUnLockLatch(&pEntry->latch); + } + } + + __rd_unlock(&pHashObj->lock, pHashObj->type); + return 0; +} + void taosHashCleanup(SHashObj *pHashObj) { if (pHashObj == NULL) { return; @@ -370,7 +433,7 @@ void taosHashCleanup(SHashObj *pHashObj) { if (pHashObj->hashList) { for (int32_t i = 0; i < pHashObj->capacity; ++i) { - SHashEntry* pEntry = pHashObj->hashList[i]; + SHashEntry *pEntry = pHashObj->hashList[i]; if (pEntry->num == 0) { assert(pEntry->head.next == 0); continue; @@ -396,8 +459,8 @@ void taosHashCleanup(SHashObj *pHashObj) { // destroy mem block size_t memBlock = taosArrayGetSize(pHashObj->pMemBlock); - for(int32_t i = 0; i < memBlock; ++i) { - void* p = taosArrayGetP(pHashObj->pMemBlock, i); + for (int32_t i = 0; i < memBlock; ++i) { + void *p = taosArrayGetP(pHashObj->pMemBlock, i); tfree(p); } @@ -414,6 +477,9 @@ SHashMutableIterator *taosHashCreateIter(SHashObj *pHashObj) { } pIter->pHashObj = pHashObj; + + // keep it in local variable, in case the resize operation expand the size + pIter->numOfEntries = pHashObj->capacity; return pIter; } @@ -428,7 +494,7 @@ bool taosHashIterNext(SHashMutableIterator *pIter) { } // check the first one - if (pIter->num == 0) { + if (pIter->numOfChecked == 0) { assert(pIter->pCur == NULL && pIter->pNext == NULL); while (1) { @@ -438,18 +504,30 @@ bool taosHashIterNext(SHashMutableIterator *pIter) { continue; } + if (pIter->pHashObj->type == HASH_ENTRY_LOCK) { + taosRLockLatch(&pEntry->latch); + } + pIter->pCur = pEntry->head.next; if (pIter->pCur->next) { pIter->pNext = pIter->pCur->next; + + if (pIter->pHashObj->type == HASH_ENTRY_LOCK) { + taosRUnLockLatch(&pEntry->latch); + } } else { + if (pIter->pHashObj->type == HASH_ENTRY_LOCK) { + taosRUnLockLatch(&pEntry->latch); + } + pIter->pNext = getNextHashNode(pIter); } break; } - pIter->num++; + pIter->numOfChecked++; return true; } else { assert(pIter->pCur != NULL); @@ -459,7 +537,7 @@ bool taosHashIterNext(SHashMutableIterator *pIter) { return false; } - pIter->num++; + pIter->numOfChecked++; if (pIter->pCur->next) { pIter->pNext = pIter->pCur->next; @@ -504,30 +582,30 @@ void taosHashTableResize(SHashObj *pHashObj) { if (!HASH_NEED_RESIZE(pHashObj)) { return; } - + // double the original capacity SHashNode *pNode = NULL; SHashNode *pNext = NULL; - + int32_t newSize = pHashObj->capacity << 1u; if (newSize > HASH_MAX_CAPACITY) { -// uDebug("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached", -// pHashObj->capacity, HASH_MAX_CAPACITY); + // uDebug("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached", + // pHashObj->capacity, HASH_MAX_CAPACITY); return; } void *pNewEntryList = realloc(pHashObj->hashList, sizeof(SHashEntry) * newSize); - if (pNewEntryList == NULL) {// todo handle error -// uDebug("cache resize failed due to out of memory, capacity remain:%d", pHashObj->capacity); + if (pNewEntryList == NULL) { // todo handle error + // uDebug("cache resize failed due to out of memory, capacity remain:%d", pHashObj->capacity); return; } - + pHashObj->hashList = pNewEntryList; size_t inc = newSize - pHashObj->capacity; - void* p = calloc(inc, sizeof(SHashEntry)); + void * p = calloc(inc, sizeof(SHashEntry)); - for(int32_t i = 0; i < inc; ++i) { + for (int32_t i = 0; i < inc; ++i) { pHashObj->hashList[i + pHashObj->capacity] = p + i * sizeof(SHashEntry); } @@ -535,7 +613,7 @@ void taosHashTableResize(SHashObj *pHashObj) { pHashObj->capacity = newSize; for (int32_t i = 0; i < pHashObj->capacity; ++i) { - SHashEntry* pe = pHashObj->hashList[i]; + SHashEntry *pe = pHashObj->hashList[i]; if (pe->num == 0) { assert(pe->head.next == NULL); continue; @@ -550,7 +628,7 @@ void taosHashTableResize(SHashObj *pHashObj) { pNext = pNode->next; assert(pNode != pNext && (pNext == NULL || pNext->prev == pNode) && pNode->prev->next == pNode); - popNodeFromEntryList(pe, pNode); + doPopFromEntryList(pe, pNode); // clear pointer pNode->next = NULL; @@ -566,8 +644,8 @@ void taosHashTableResize(SHashObj *pHashObj) { } } -// uDebug("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", pHashObj->capacity, -// ((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0); + // uDebug("hash table resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", pHashObj->capacity, + // ((double)pHashObj->size) / pHashObj->capacity, (et - st) / 1000.0); } SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, size_t dsize, uint32_t hashVal) { @@ -579,7 +657,7 @@ SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, s pNewNode->data = malloc(dsize + keyLen); memcpy(pNewNode->data, pData, dsize); - + pNewNode->key = pNewNode->data + dsize; memcpy(pNewNode->key, key, keyLen); @@ -588,10 +666,10 @@ SHashNode *doCreateHashNode(const void *key, size_t keyLen, const void *pData, s return pNewNode; } -void pushfrontNode(SHashEntry* pEntry, SHashNode *pNode) { +void pushfrontNode(SHashEntry *pEntry, SHashNode *pNode) { assert(pNode != NULL && pEntry != NULL); - - SHashNode* pNext = pEntry->head.next; + + SHashNode *pNext = pEntry->head.next; if (pNext != NULL) { pNext->prev = pNode; } @@ -605,17 +683,29 @@ void pushfrontNode(SHashEntry* pEntry, SHashNode *pNode) { SHashNode *getNextHashNode(SHashMutableIterator *pIter) { assert(pIter != NULL); - + pIter->entryIndex++; - while (pIter->entryIndex < pIter->pHashObj->capacity) { - SHashEntry*pEntry = pIter->pHashObj->hashList[pIter->entryIndex]; + SHashNode *p = NULL; + + while (pIter->entryIndex < pIter->numOfEntries) { + SHashEntry *pEntry = pIter->pHashObj->hashList[pIter->entryIndex]; if (pEntry->num == 0) { pIter->entryIndex++; continue; } - - return pEntry->head.next; + + if (pIter->pHashObj->type == HASH_ENTRY_LOCK) { + taosRLockLatch(&pEntry->latch); + } + + p = pEntry->head.next; + + if (pIter->pHashObj->type == HASH_ENTRY_LOCK) { + taosRUnLockLatch(&pEntry->latch); + } + + return p; } - + return NULL; } diff --git a/src/util/src/tcache.c b/src/util/src/tcache.c index f8711828d183555e6ea630a334b692963485e587..8523a6f8b6548b2495312152b7fa4acc0e8fd7b1 100644 --- a/src/util/src/tcache.c +++ b/src/util/src/tcache.c @@ -63,13 +63,6 @@ static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pCacheObj) { #endif } -#if 0 -static FORCE_INLINE void taosFreeNode(void *data) { - SCacheDataNode *pNode = *(SCacheDataNode **)data; - free(pNode); -} -#endif - /** * @param key key of object for hash, usually a null-terminated string * @param keyLen length of key @@ -89,13 +82,6 @@ static SCacheDataNode *taosCreateCacheNode(const char *key, size_t keyLen, const */ static void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode); -/** - * remove node in trash can - * @param pCacheObj - * @param pElem - */ -static void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem); - /** * remove nodes in trash with refCount == 0 in cache * @param pNode @@ -113,17 +99,19 @@ static void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force); static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pCacheObj, SCacheDataNode *pNode) { if (pNode->signature != (uint64_t)pNode) { uError("key:%s, %p data is invalid, or has been released", pNode->key, pNode); + assert(0); return; } - taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize); - pCacheObj->totalSize -= pNode->size; + int32_t size = taosHashGetSize(pCacheObj->pHashTable); uDebug("cache:%s, key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes", - pCacheObj->name, pNode->key, pNode->data, (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, - pNode->size); + pCacheObj->name, pNode->key, pNode->data, size, pCacheObj->totalSize, pNode->size); + + if (pCacheObj->freeFp) { + pCacheObj->freeFp(pNode->data); + } - if (pCacheObj->freeFp) pCacheObj->freeFp(pNode->data); free(pNode); } @@ -137,6 +125,32 @@ static FORCE_INLINE void taosCacheMoveToTrash(SCacheObj *pCacheObj, SCacheDataNo taosAddToTrash(pCacheObj, pNode); } +static FORCE_INLINE void doRemoveElemInTrashcan(SCacheObj* pCacheObj, STrashElem *pElem) { + if (pElem->pData->signature != (uint64_t) pElem->pData) { + uError("key:sig:0x%" PRIx64 " %p data has been released, ignore", pElem->pData->signature, pElem->pData); + return; + } + + pCacheObj->numOfElemsInTrash--; + if (pElem->prev) { + pElem->prev->next = pElem->next; + } else { // pnode is the header, update header + pCacheObj->pTrash = pElem->next; + } + + if (pElem->next) { + pElem->next->prev = pElem->prev; + } +} + +static FORCE_INLINE void doDestroyTrashcanElem(SCacheObj* pCacheObj, STrashElem *pElem) { + if (pCacheObj->freeFp) { + pCacheObj->freeFp(pElem->pData->data); + } + + free(pElem->pData); + free(pElem); +} /** * update data in cache * @param pCacheObj @@ -261,12 +275,11 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v } else { // duplicated key exists while (1) { SCacheDataNode* p = NULL; - int32_t ret = taosHashRemoveNode(pCacheObj->pHashTable, key, keyLen, (void*) &p, sizeof(void*)); + int32_t ret = taosHashRemoveWithData(pCacheObj->pHashTable, key, keyLen, (void*) &p, sizeof(void*)); // add to trashcan if (ret == 0) { if (T_REF_VAL_GET(p) == 0) { - if (pCacheObj->freeFp) { pCacheObj->freeFp(p->data); } @@ -300,27 +313,25 @@ void *taosCachePut(SCacheObj *pCacheObj, const void *key, size_t keyLen, const v return pNode1->data; } +static void incRefFn(void* ptNode) { + assert(ptNode != NULL); + + SCacheDataNode** p = (SCacheDataNode**) ptNode; + int32_t ret = T_REF_INC(*p); + assert(ret > 0); +} + void *taosCacheAcquireByKey(SCacheObj *pCacheObj, const void *key, size_t keyLen) { if (pCacheObj == NULL || taosHashGetSize(pCacheObj->pHashTable) == 0) { return NULL; } - void *pData = NULL; - -// __cache_rd_lock(pCacheObj); - SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pCacheObj->pHashTable, key, keyLen); - - int32_t ref = 0; - if (ptNode != NULL) { - ref = T_REF_INC(*ptNode); - pData = (*ptNode)->data; - } - -// __cache_unlock(pCacheObj); + SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGetCB(pCacheObj->pHashTable, key, keyLen, incRefFn); + void* pData = (ptNode != NULL)? (*ptNode)->data:NULL; if (pData != NULL) { atomic_add_fetch_32(&pCacheObj->statistics.hitCount, 1); - uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData, ref); + uDebug("cache:%s, key:%p, %p is retrieved from cache, refcnt:%d", pCacheObj->name, key, pData, T_REF_VAL_GET(*ptNode)); } else { atomic_add_fetch_32(&pCacheObj->statistics.missCount, 1); uDebug("cache:%s, key:%p, not in cache, retrieved failed", pCacheObj->name, key); @@ -423,8 +434,11 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { if (_remove) { // NOTE: once refcount is decrease, pNode may be freed by other thread immediately. + char* key = pNode->key; + char* d = pNode->data; + int32_t ref = T_REF_DEC(pNode); - uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, pNode->key, pNode->data, ref); + uDebug("cache:%s, key:%p, %p is released, refcnt:%d", pCacheObj->name, key, d, ref); /* * If it is not referenced by other users, remove it immediately. Otherwise move this node to trashcan wait for all users @@ -437,24 +451,35 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { if (ref == 0) { assert(pNode->pTNodeHeader->pData == pNode); - // todo add lock here - taosRemoveFromTrashCan(pCacheObj, pNode->pTNodeHeader); + __cache_wr_lock(pCacheObj); + doRemoveElemInTrashcan(pCacheObj, pNode->pTNodeHeader); + __cache_unlock(pCacheObj); + + doDestroyTrashcanElem(pCacheObj, pNode->pTNodeHeader); } } else { int32_t ret = taosHashRemove(pCacheObj->pHashTable, pNode->key, pNode->keySize); - if (ret == 0) { // successfully remove from hash table + + // successfully remove from hash table, if failed, this node must have been move to trash already, do nothing. + // note that the remove operation can be executed only once. + if (ret == 0) { if (ref > 0) { assert(pNode->pTNodeHeader == NULL); - // todo trashcan lock + __cache_wr_lock(pCacheObj); taosAddToTrash(pCacheObj, pNode); + __cache_unlock(pCacheObj); } else { // ref == 0 - atomic_fetch_sub_ptr(&pCacheObj->totalSize, pNode->size); + atomic_sub_fetch_64(&pCacheObj->totalSize, pNode->size); + uDebug("cache:%s, key:%p, %p is destroyed from cache, totalNum:%d totalSize:%" PRId64 "bytes size:%dbytes", pCacheObj->name, pNode->key, pNode->data, (int32_t)taosHashGetSize(pCacheObj->pHashTable), pCacheObj->totalSize, pNode->size); - if (pCacheObj->freeFp) pCacheObj->freeFp(pNode->data); + if (pCacheObj->freeFp) { + pCacheObj->freeFp(pNode->data); + } + free(pNode); } } @@ -462,33 +487,40 @@ void taosCacheRelease(SCacheObj *pCacheObj, void **data, bool _remove) { } else { // NOTE: once refcount is decrease, pNode may be freed by other thread immediately. + char* key = pNode->key; + char* p = pNode->data; + int32_t ref = T_REF_DEC(pNode); + uDebug("cache:%s, key:%p, %p released, refcnt:%d, data in trancan:%d", pCacheObj->name, key, p, ref, inTrashCan); + } +} + +typedef struct SHashTravSupp { + SCacheObj* pCacheObj; + int64_t time; + __cache_free_fn_t fp; +} SHashTravSupp; - // todo so, invalid read here! - uDebug("cache:%s, key:%p, %p released, refcnt:%d, data in trancan:%d", pCacheObj->name, pNode->key, pNode->data, - ref, inTrashCan); +static bool travHashTableEmptyFn(void* param, void* data) { + SHashTravSupp* ps = (SHashTravSupp*) param; + SCacheObj* pCacheObj= ps->pCacheObj; + + SCacheDataNode *pNode = *(SCacheDataNode **) data; + + if (T_REF_VAL_GET(pNode) == 0) { + taosCacheReleaseNode(pCacheObj, pNode); + } else { // do add to trashcan + taosAddToTrash(pCacheObj, pNode); } + + // this node should be remove from hash table + return false; } void taosCacheEmpty(SCacheObj *pCacheObj) { - SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable); - - __cache_wr_lock(pCacheObj); - while (taosHashIterNext(pIter)) { - if (pCacheObj->deleting == 1) { - break; - } - - SCacheDataNode *pNode = *(SCacheDataNode **) taosHashIterGet(pIter); - if (T_REF_VAL_GET(pNode) == 0) { - taosCacheReleaseNode(pCacheObj, pNode); - } else { - taosCacheMoveToTrash(pCacheObj, pNode); - } - } - __cache_unlock(pCacheObj); - - taosHashDestroyIter(pIter); + SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()}; + + taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup); taosTrashCanEmpty(pCacheObj, false); } @@ -553,33 +585,6 @@ void taosAddToTrash(SCacheObj *pCacheObj, SCacheDataNode *pNode) { uDebug("key:%p, %p move to trash, numOfElem in trash:%d", pNode->key, pNode->data, pCacheObj->numOfElemsInTrash); } -void taosRemoveFromTrashCan(SCacheObj *pCacheObj, STrashElem *pElem) { - if (pElem->pData->signature != (uint64_t)pElem->pData) { - uError("key:sig:0x%" PRIx64 " %p data has been released, ignore", pElem->pData->signature, pElem->pData); - return; - } - - pCacheObj->numOfElemsInTrash--; - if (pElem->prev) { - pElem->prev->next = pElem->next; - } else { /* pnode is the header, update header */ - pCacheObj->pTrash = pElem->next; - } - - if (pElem->next) { - pElem->next->prev = pElem->prev; - } - - pElem->pData->signature = 0; - if (pCacheObj->freeFp) { - pCacheObj->freeFp(pElem->pData->data); - } - - free(pElem->pData); - free(pElem); -} - -// TODO add another lock when scanning trashcan void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) { __cache_wr_lock(pCacheObj); @@ -587,8 +592,8 @@ void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) { if (pCacheObj->pTrash != NULL) { uError("key:inconsistency data in cache, numOfElem in trash:%d", pCacheObj->numOfElemsInTrash); } - pCacheObj->pTrash = NULL; + pCacheObj->pTrash = NULL; __cache_unlock(pCacheObj); return; } @@ -604,10 +609,12 @@ void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) { if (force || (T_REF_VAL_GET(pElem->pData) == 0)) { uDebug("key:%p, %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData->data, pCacheObj->numOfElemsInTrash - 1); - STrashElem *p = pElem; + STrashElem *p = pElem; pElem = pElem->next; - taosRemoveFromTrashCan(pCacheObj, p); + + doRemoveElemInTrashcan(pCacheObj, p); + doDestroyTrashcanElem(pCacheObj, p); } else { pElem = pElem->next; } @@ -617,26 +624,27 @@ void taosTrashCanEmpty(SCacheObj *pCacheObj, bool force) { } void doCleanupDataCache(SCacheObj *pCacheObj) { - __cache_wr_lock(pCacheObj); - SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable); - while (taosHashIterNext(pIter)) { - SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter); - - int32_t c = T_REF_VAL_GET(pNode); - if (c <= 0) { - taosCacheReleaseNode(pCacheObj, pNode); - } else { - uDebug("cache:%s key:%p, %p will not remove from cache, refcnt:%d", pCacheObj->name, pNode->key, - pNode->data, T_REF_VAL_GET(pNode)); - } - } - taosHashDestroyIter(pIter); +// SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable); +// while (taosHashIterNext(pIter)) { +// SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter); +// +// int32_t c = T_REF_VAL_GET(pNode); +// if (c <= 0) { +// taosCacheReleaseNode(pCacheObj, pNode); +// } else { +// uDebug("cache:%s key:%p, %p will not remove from cache, refcnt:%d", pCacheObj->name, pNode->key, +// pNode->data, T_REF_VAL_GET(pNode)); +// } +// } +// +// taosHashDestroyIter(pIter); + + SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = NULL, .time = taosGetTimestampMs()}; + taosHashCondTraverse(pCacheObj->pHashTable, travHashTableEmptyFn, &sup); // todo memory leak if there are object with refcount greater than 0 in hash table? taosHashCleanup(pCacheObj->pHashTable); - __cache_unlock(pCacheObj); - taosTrashCanEmpty(pCacheObj, true); __cache_lock_destroy(pCacheObj); @@ -645,26 +653,31 @@ void doCleanupDataCache(SCacheObj *pCacheObj) { free(pCacheObj); } -static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_free_fn_t fp) { - SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable); +bool travHashTableFn(void* param, void* data) { + SHashTravSupp* ps = (SHashTravSupp*) param; + SCacheObj* pCacheObj= ps->pCacheObj; -// __cache_wr_lock(pCacheObj); - while (taosHashIterNext(pIter)) { - SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter); + SCacheDataNode* pNode = *(SCacheDataNode **) data; + if (pNode->expireTime < ps->time && T_REF_VAL_GET(pNode) <= 0) { + taosCacheReleaseNode(pCacheObj, pNode); - if (pNode->expireTime < time && T_REF_VAL_GET(pNode) <= 0) { - taosCacheReleaseNode(pCacheObj, pNode); - continue; - } + // this node should be remove from hash table + return false; + } - if (fp) { - fp(pNode->data); - } + if (ps->fp) { + (ps->fp)(pNode->data); } -// __cache_unlock(pCacheObj); + // do not remove element in hash table + return true; +} + +static void doCacheRefresh(SCacheObj* pCacheObj, int64_t time, __cache_free_fn_t fp) { + assert(pCacheObj != NULL); - taosHashDestroyIter(pIter); + SHashTravSupp sup = {.pCacheObj = pCacheObj, .fp = fp, .time = time}; + taosHashCondTraverse(pCacheObj->pHashTable, travHashTableFn, &sup); } void* taosCacheTimedRefresh(void *handle) {