diff --git a/src/client/src/tcache.c b/src/client/src/tcache.c index f9941c1c0f2102c7fc2b0c31eb264cbabcb3498a..ff741d1fd95d1daea2dc1416dc8b38f2d08be180 100644 --- a/src/client/src/tcache.c +++ b/src/client/src/tcache.c @@ -13,86 +13,17 @@ * along with this program. If not, see . */ -#include "os.h" - -#include "hashfunc.h" #include "tcache.h" +#include "hash.h" +#include "hashfunc.h" + #include "tlog.h" #include "ttime.h" #include "ttimer.h" #include "tutil.h" -#define HASH_MAX_CAPACITY (1024*1024*16) -#define HASH_VALUE_IN_TRASH (-1) -#define HASH_DEFAULT_LOAD_FACTOR (0.75) -#define HASH_INDEX(v, c) ((v) & ((c)-1)) - -/** - * todo: refactor to extract the hash table out of cache structure - */ -typedef struct SCacheStatis { - int64_t missCount; - int64_t hitCount; - int64_t totalAccess; - int64_t refreshCount; - int32_t numOfCollision; - int32_t numOfResize; - int64_t resizeTime; -} SCacheStatis; - -typedef struct _cache_node_t { - char * key; // null-terminated string - struct _cache_node_t *prev; - struct _cache_node_t *next; - uint64_t addTime; // the time when this element is added or updated into cache - uint64_t time; // end time when this element should be remove from cache - uint64_t signature; - - /* - * reference count for this object - * if this value is larger than 0, this value will never be released - */ - uint32_t refCount; - uint32_t hashVal; // the hash value of key, if hashVal == HASH_VALUE_IN_TRASH, this node is moved to trash - uint32_t nodeSize; // allocated size for current SDataNode - char data[]; -} SDataNode; - -typedef uint32_t (*_hashFunc)(const char *, uint32_t); - -typedef struct { - SDataNode **hashList; - int capacity; - int size; - int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included. - int64_t refreshTime; - - /* - * to accommodate the old datanode which has the same key value of new one in hashList - * when an new node is put into cache, if an existed one with the same key: - * 1. if the old one does not be referenced, update it. - * 2. otherwise, move the old one to pTrash, add the new one. - * - * when the node in pTrash does not be referenced, it will be release at the expired time - */ - SDataNode * pTrash; - void * tmrCtrl; - void * pTimer; - SCacheStatis statistics; - _hashFunc hashFp; - int numOfElemsInTrash; // number of element in trash - int16_t deleting; // set the deleting flag to stop refreshing asap. - -#if defined LINUX - pthread_rwlock_t lock; -#else - pthread_mutex_t lock; -#endif - -} SCacheObj; - static FORCE_INLINE void __cache_wr_lock(SCacheObj *pObj) { -#if defined LINUX +#if defined(LINUX) pthread_rwlock_wrlock(&pObj->lock); #else pthread_mutex_lock(&pObj->lock); @@ -100,7 +31,7 @@ static FORCE_INLINE void __cache_wr_lock(SCacheObj *pObj) { } static FORCE_INLINE void __cache_rd_lock(SCacheObj *pObj) { -#if defined LINUX +#if defined(LINUX) pthread_rwlock_rdlock(&pObj->lock); #else pthread_mutex_lock(&pObj->lock); @@ -108,7 +39,7 @@ static FORCE_INLINE void __cache_rd_lock(SCacheObj *pObj) { } static FORCE_INLINE void __cache_unlock(SCacheObj *pObj) { -#if defined LINUX +#if defined(LINUX) pthread_rwlock_unlock(&pObj->lock); #else pthread_mutex_unlock(&pObj->lock); @@ -116,7 +47,7 @@ static FORCE_INLINE void __cache_unlock(SCacheObj *pObj) { } static FORCE_INLINE int32_t __cache_lock_init(SCacheObj *pObj) { -#if defined LINUX +#if defined(LINUX) return pthread_rwlock_init(&pObj->lock, NULL); #else return pthread_mutex_init(&pObj->lock, NULL); @@ -124,19 +55,16 @@ static FORCE_INLINE int32_t __cache_lock_init(SCacheObj *pObj) { } static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pObj) { -#if defined LINUX +#if defined(LINUX) pthread_rwlock_destroy(&pObj->lock); #else pthread_mutex_destroy(&pObj->lock); #endif } -static FORCE_INLINE int32_t taosHashTableLength(int32_t length) { - int32_t trueLength = MIN(length, HASH_MAX_CAPACITY); - - int32_t i = 4; - while (i < trueLength) i = (i << 1); - return i; +static FORCE_INLINE void taosFreeNode(void *data) { + SCacheDataNode *pNode = *(SCacheDataNode **)data; + free(pNode); } /** @@ -145,86 +73,83 @@ static FORCE_INLINE int32_t taosHashTableLength(int32_t length) { * @param pData actually data. required a consecutive memory block, no pointer is allowed * in pData. Pointer copy causes memory access error. * @param size size of block - * @param lifespan total survial time from now - * @return SDataNode + * @param lifespan total survial expiredTime from now + * @return SCacheDataNode */ -static SDataNode *taosCreateHashNode(const char *key, uint32_t keyLen, const char *pData, size_t dataSize, - uint64_t lifespan) { - size_t totalSize = dataSize + sizeof(SDataNode) + keyLen; - - SDataNode *pNewNode = calloc(1, totalSize); +static SCacheDataNode *taosCreateHashNode(const char *key, size_t keyLen, const char *pData, size_t size, + uint64_t duration) { + size_t totalSize = size + sizeof(SCacheDataNode) + keyLen; + + SCacheDataNode *pNewNode = calloc(1, totalSize); if (pNewNode == NULL) { pError("failed to allocate memory, reason:%s", strerror(errno)); return NULL; } - - memcpy(pNewNode->data, pData, dataSize); - pNewNode->addTime = (uint64_t)taosGetTimestampMs(); - pNewNode->time = pNewNode->addTime + lifespan; - - pNewNode->key = pNewNode->data + dataSize; - strcpy(pNewNode->key, key); - + + memcpy(pNewNode->data, pData, size); + + pNewNode->key = (char *)pNewNode + sizeof(SCacheDataNode) + size; + pNewNode->keySize = keyLen; + + memcpy(pNewNode->key, key, keyLen); + + pNewNode->addedTime = (uint64_t)taosGetTimestampMs(); + pNewNode->expiredTime = pNewNode->addedTime + duration; + pNewNode->signature = (uint64_t)pNewNode; - pNewNode->nodeSize = (uint32_t)totalSize; - + pNewNode->size = (uint32_t)totalSize; + return pNewNode; } /** - * hash key function - * - * @param key key string - * @param len length of key - * @return hash value - */ -static FORCE_INLINE uint32_t taosHashKey(const char *key, uint32_t len) { return MurmurHash3_32(key, len); } - -/** - * add object node into trash, and this object is closed for referencing if it is add to trash + * addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash * It will be removed until the pNode->refCount == 0 * @param pObj Cache object * @param pNode Cache slot object */ -static void taosAddToTrash(SCacheObj *pObj, SDataNode *pNode) { - if (pNode->hashVal == HASH_VALUE_IN_TRASH) { /* node is already in trash */ +static void taosAddToTrash(SCacheObj *pObj, SCacheDataNode *pNode) { + if (pNode->inTrash) { /* node is already in trash */ return; } - - pNode->next = pObj->pTrash; + + STrashElem *pElem = calloc(1, sizeof(STrashElem)); + pElem->pData = pNode; + + pElem->next = pObj->pTrash; if (pObj->pTrash) { - pObj->pTrash->prev = pNode; + pObj->pTrash->prev = pElem; } - - pNode->prev = NULL; - pObj->pTrash = pNode; - - pNode->hashVal = HASH_VALUE_IN_TRASH; + + pElem->prev = NULL; + pObj->pTrash = pElem; + + pNode->inTrash = true; pObj->numOfElemsInTrash++; - + pTrace("key:%s %p move to trash, numOfElem in trash:%d", pNode->key, pNode, pObj->numOfElemsInTrash); } -static void taosRemoveFromTrash(SCacheObj *pObj, SDataNode *pNode) { - if (pNode->signature != (uint64_t)pNode) { - pError("key:sig:%d %p data has been released, ignore", pNode->signature, pNode); +static void taosRemoveFromTrash(SCacheObj *pObj, STrashElem *pElem) { + if (pElem->pData->signature != (uint64_t)pElem->pData) { + pError("key:sig:%d %p data has been released, ignore", pElem->pData->signature, pElem->pData); return; } - + pObj->numOfElemsInTrash--; - if (pNode->prev) { - pNode->prev->next = pNode->next; - } else { - /* pnode is the header, update header */ - pObj->pTrash = pNode->next; + if (pElem->prev) { + pElem->prev->next = pElem->next; + } else { /* pnode is the header, update header */ + pObj->pTrash = pElem->next; } - - if (pNode->next) { - pNode->next->prev = pNode->prev; + + if (pElem->next) { + pElem->next->prev = pElem->prev; } - - pNode->signature = 0; - free(pNode); + + pElem->pData->signature = 0; + free(pElem->pData); + free(pElem); } /** * remove nodes in trash with refCount == 0 in cache @@ -233,236 +158,57 @@ static void taosRemoveFromTrash(SCacheObj *pObj, SDataNode *pNode) { * @param force force model, if true, remove data in trash without check refcount. * may cause corruption. So, forece model only applys before cache is closed */ -static void taosClearCacheTrash(SCacheObj *pObj, bool force) { +static void taosTrashEmpty(SCacheObj *pObj, bool force) { __cache_wr_lock(pObj); - + if (pObj->numOfElemsInTrash == 0) { if (pObj->pTrash != NULL) { pError("key:inconsistency data in cache, numOfElem in trash:%d", pObj->numOfElemsInTrash); } pObj->pTrash = NULL; - + __cache_unlock(pObj); return; } - - SDataNode *pNode = pObj->pTrash; - - while (pNode) { - if (pNode->refCount < 0) { - pError("key:%s %p in trash released more than referenced, removed", pNode->key, pNode); - pNode->refCount = 0; - } - - if (pNode->next == pNode) { - pNode->next = NULL; + + STrashElem *pElem = pObj->pTrash; + + while (pElem) { + T_REF_VAL_CHECK(pElem->pData); + if (pElem->next == pElem) { + pElem->next = NULL; } - - if (force || (pNode->refCount == 0)) { - pTrace("key:%s %p removed from trash. numOfElem in trash:%d", pNode->key, pNode, pObj->numOfElemsInTrash - 1) - SDataNode *pTmp = pNode; - pNode = pNode->next; - taosRemoveFromTrash(pObj, pTmp); + + if (force || (T_REF_VAL_GET(pElem->pData) == 0)) { + pTrace("key:%s %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData, + pObj->numOfElemsInTrash - 1); + STrashElem *p = pElem; + + pElem = pElem->next; + taosRemoveFromTrash(pObj, p); } else { - pNode = pNode->next; + pElem = pElem->next; } } - + assert(pObj->numOfElemsInTrash >= 0); __cache_unlock(pObj); } -/** - * add data node into cache - * @param pObj cache object - * @param pNode Cache slot object - */ -static void taosAddNodeToHashTable(SCacheObj *pObj, SDataNode *pNode) { - int32_t slotIndex = HASH_INDEX(pNode->hashVal, pObj->capacity); - pNode->next = pObj->hashList[slotIndex]; - - if (pObj->hashList[slotIndex] != NULL) { - (pObj->hashList[slotIndex])->prev = pNode; - pObj->statistics.numOfCollision++; - } - pObj->hashList[slotIndex] = pNode; - - pObj->size++; - pObj->totalSize += pNode->nodeSize; - - pTrace("key:%s %p add to hash table", pNode->key, pNode); -} - -/** - * remove node in hash list - * @param pObj - * @param pNode - */ -static void taosRemoveNodeInHashTable(SCacheObj *pObj, SDataNode *pNode) { - if (pNode->hashVal == HASH_VALUE_IN_TRASH) return; - - SDataNode *pNext = pNode->next; - if (pNode->prev != NULL) { - pNode->prev->next = pNext; - } else { /* the node is in hashlist, remove it */ - pObj->hashList[HASH_INDEX(pNode->hashVal, pObj->capacity)] = pNext; - } - - if (pNext != NULL) { - pNext->prev = pNode->prev; - } - - pObj->size--; - pObj->totalSize -= pNode->nodeSize; - - pNode->next = NULL; - pNode->prev = NULL; - - pTrace("key:%s %p remove from hashtable", pNode->key, pNode); -} - -/** - * in-place node in hashlist - * @param pObj cache object - * @param pNode data node - */ -static void taosUpdateInHashTable(SCacheObj *pObj, SDataNode *pNode) { - assert(pNode->hashVal >= 0); - - if (pNode->prev) { - pNode->prev->next = pNode; - } else { - pObj->hashList[HASH_INDEX(pNode->hashVal, pObj->capacity)] = pNode; - } - - if (pNode->next) { - (pNode->next)->prev = pNode; - } - - pTrace("key:%s %p update hashtable", pNode->key, pNode); -} - -/** - * get SDataNode from hashlist, nodes from trash are not included. - * @param pObj Cache objection - * @param key key for hash - * @param keyLen key length - * @return - */ -static SDataNode *taosGetNodeFromHashTable(SCacheObj *pObj, const char *key, uint32_t keyLen) { - uint32_t hash = (*pObj->hashFp)(key, keyLen); - - int32_t slot = HASH_INDEX(hash, pObj->capacity); - SDataNode *pNode = pObj->hashList[slot]; - - while (pNode) { - if (strcmp(pNode->key, key) == 0) break; - - pNode = pNode->next; - } - - if (pNode) { - assert(HASH_INDEX(pNode->hashVal, pObj->capacity) == slot); - } - - return pNode; -} - -/** - * resize the hash list if the threshold is reached - * - * @param pObj - */ -static void taosHashTableResize(SCacheObj *pObj) { - if (pObj->size < pObj->capacity * HASH_DEFAULT_LOAD_FACTOR) { - return; - } - - // double the original capacity - pObj->statistics.numOfResize++; - SDataNode *pNode = NULL; - SDataNode *pNext = NULL; - - int32_t newSize = pObj->capacity << 1; - if (newSize > HASH_MAX_CAPACITY) { - pTrace("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached", - pObj->capacity, HASH_MAX_CAPACITY); - return; - } - - int64_t st = taosGetTimestampUs(); - SDataNode **pList = realloc(pObj->hashList, sizeof(SDataNode *) * newSize); - if (pList == NULL) { - pTrace("cache resize failed due to out of memory, capacity remain:%d", pObj->capacity); - return; - } - - pObj->hashList = pList; - - int32_t inc = newSize - pObj->capacity; - memset(&pObj->hashList[pObj->capacity], 0, inc * sizeof(SDataNode *)); - - pObj->capacity = newSize; - - for (int32_t i = 0; i < pObj->capacity; ++i) { - pNode = pObj->hashList[i]; - - while (pNode) { - int32_t j = HASH_INDEX(pNode->hashVal, pObj->capacity); - if (j == i) { // this key resides in the same slot, no need to relocate it - pNode = pNode->next; - } else { - pNext = pNode->next; - - // remove from current slot - if (pNode->prev != NULL) { - pNode->prev->next = pNode->next; - } else { - pObj->hashList[i] = pNode->next; - } - - if (pNode->next != NULL) { - (pNode->next)->prev = pNode->prev; - } - - // added into new slot - pNode->next = NULL; - pNode->prev = NULL; - - pNode->next = pObj->hashList[j]; - - if (pObj->hashList[j] != NULL) { - (pObj->hashList[j])->prev = pNode; - } - pObj->hashList[j] = pNode; - - // continue - pNode = pNext; - } - } - } - - int64_t et = taosGetTimestampUs(); - pObj->statistics.resizeTime += (et - st); - - pTrace("cache resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", pObj->capacity, - ((double)pObj->size) / pObj->capacity, (et - st) / 1000.0); -} - /** * release node * @param pObj cache object * @param pNode data node */ -static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pObj, SDataNode *pNode) { - taosRemoveNodeInHashTable(pObj, pNode); +static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pObj, SCacheDataNode *pNode) { if (pNode->signature != (uint64_t)pNode) { pError("key:%s, %p data is invalid, or has been released", pNode->key, pNode); return; } - - pTrace("key:%s is removed from cache,total:%d,size:%ldbytes", pNode->key, pObj->size, pObj->totalSize); - pNode->signature = 0; + + taosHashRemove(pObj->pHashTable, pNode->key, pNode->keySize); + pTrace("key:%s is removed from cache,total:%d,size:%ldbytes", pNode->key, pObj->totalSize, pObj->totalSize); + free(pNode); } @@ -471,8 +217,8 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pObj, SDataNode *pNode) * @param pObj * @param pNode */ -static FORCE_INLINE void taosCacheMoveNodeToTrash(SCacheObj *pObj, SDataNode *pNode) { - taosRemoveNodeInHashTable(pObj, pNode); +static FORCE_INLINE void taosCacheMoveToTrash(SCacheObj *pObj, SCacheDataNode *pNode) { + taosHashRemove(pObj->pHashTable, pNode->key, pNode->keySize); taosAddToTrash(pObj, pNode); } @@ -486,56 +232,53 @@ static FORCE_INLINE void taosCacheMoveNodeToTrash(SCacheObj *pObj, SDataNode *pN * @param dataSize * @return */ -static SDataNode *taosUpdateCacheImpl(SCacheObj *pObj, SDataNode *pNode, char *key, int32_t keyLen, void *pData, - uint32_t dataSize, uint64_t keepTime) { - SDataNode *pNewNode = NULL; - +static SCacheDataNode *taosUpdateCacheImpl(SCacheObj *pObj, SCacheDataNode *pNode, char *key, int32_t keyLen, + void *pData, uint32_t dataSize, uint64_t duration) { + SCacheDataNode *pNewNode = NULL; + // only a node is not referenced by any other object, in-place update it - if (pNode->refCount == 0) { - size_t newSize = sizeof(SDataNode) + dataSize + keyLen; - - pNewNode = (SDataNode *)realloc(pNode, newSize); + if (T_REF_VAL_GET(pNode) == 0) { + size_t newSize = sizeof(SCacheDataNode) + dataSize + keyLen; + + pNewNode = (SCacheDataNode *)realloc(pNode, newSize); if (pNewNode == NULL) { return NULL; } - + pNewNode->signature = (uint64_t)pNewNode; memcpy(pNewNode->data, pData, dataSize); - - pNewNode->key = pNewNode->data + dataSize; - strcpy(pNewNode->key, key); - + + pNewNode->key = (char *)pNewNode + sizeof(SCacheDataNode) + dataSize; + pNewNode->keySize = keyLen; + memcpy(pNewNode->key, key, keyLen); + // update the timestamp information for updated key/value - pNewNode->addTime = taosGetTimestampMs(); - pNewNode->time = pNewNode->addTime + keepTime; - - atomic_add_fetch_32(&pNewNode->refCount, 1); - + pNewNode->addedTime = taosGetTimestampMs(); + pNewNode->expiredTime = pNewNode->addedTime + duration; + + T_REF_INC(pNewNode); + // the address of this node may be changed, so the prev and next element should update the corresponding pointer - taosUpdateInHashTable(pObj, pNewNode); + taosHashPut(pObj->pHashTable, key, keyLen, &pNewNode, sizeof(void *)); } else { - int32_t hashVal = pNode->hashVal; - taosCacheMoveNodeToTrash(pObj, pNode); - - pNewNode = taosCreateHashNode(key, keyLen, pData, dataSize, keepTime); + taosCacheMoveToTrash(pObj, pNode); + + pNewNode = taosCreateHashNode(key, keyLen, pData, dataSize, duration); if (pNewNode == NULL) { return NULL; } - - atomic_add_fetch_32(&pNewNode->refCount, 1); - - assert(hashVal == (*pObj->hashFp)(key, keyLen - 1)); - pNewNode->hashVal = hashVal; - - // add new element to hashtable - taosAddNodeToHashTable(pObj, pNewNode); + + T_REF_INC(pNewNode); + + // addedTime new element to hashtable + taosHashPut(pObj->pHashTable, key, keyLen, &pNewNode, sizeof(void *)); } - + return pNewNode; } /** - * add data into hash table + * addedTime data into hash table * @param key * @param pData * @param size @@ -544,206 +287,31 @@ static SDataNode *taosUpdateCacheImpl(SCacheObj *pObj, SDataNode *pNode, char *k * @param pNode * @return */ -static FORCE_INLINE SDataNode *taosAddToCacheImpl(SCacheObj *pObj, char *key, uint32_t keyLen, const char *pData, - int dataSize, uint64_t lifespan) { - SDataNode *pNode = taosCreateHashNode(key, keyLen, pData, dataSize, lifespan); +static FORCE_INLINE SCacheDataNode *taosAddToCacheImpl(SCacheObj *pObj, char *key, size_t keyLen, const void *pData, + size_t dataSize, uint64_t duration) { + SCacheDataNode *pNode = taosCreateHashNode(key, keyLen, pData, dataSize, duration); if (pNode == NULL) { return NULL; } - - atomic_add_fetch_32(&pNode->refCount, 1); - pNode->hashVal = (*pObj->hashFp)(key, keyLen - 1); - taosAddNodeToHashTable(pObj, pNode); - + + T_REF_INC(pNode); + taosHashPut(pObj->pHashTable, key, keyLen, &pNode, sizeof(void *)); return pNode; } -/** - * add data into cache - * - * @param handle cache object - * @param key key - * @param pData cached data - * @param dataSize data size - * @param keepTime survival time in second - * @return cached element - */ -void *taosAddDataIntoCache(void *handle, char *key, char *pData, int dataSize, int keepTime) { - SDataNode *pNode; - SCacheObj *pObj; - - pObj = (SCacheObj *)handle; - if (pObj == NULL || pObj->capacity == 0) return NULL; - - uint32_t keyLen = (uint32_t)strlen(key) + 1; - +static void doCleanupDataCache(SCacheObj *pObj) { __cache_wr_lock(pObj); - - SDataNode *pOldNode = taosGetNodeFromHashTable(pObj, key, keyLen - 1); - - if (pOldNode == NULL) { // do add to cache - // check if the threshold is reached - taosHashTableResize(pObj); - - pNode = taosAddToCacheImpl(pObj, key, keyLen, pData, dataSize, keepTime * 1000L); - if (NULL != pNode) { - pTrace( - "key:%s %p added into cache, slot:%d, addTime:%" PRIu64 ", expireTime:%" PRIu64 ", cache total:%d, " - "size:%" PRId64 " bytes, collision:%d", - pNode->key, pNode, HASH_INDEX(pNode->hashVal, pObj->capacity), pNode->addTime, pNode->time, pObj->size, - pObj->totalSize, pObj->statistics.numOfCollision); - } - } else { // old data exists, update the node - pNode = taosUpdateCacheImpl(pObj, pOldNode, key, keyLen, pData, dataSize, keepTime * 1000L); - pTrace("key:%s %p exist in cache, updated", key, pNode); - } - - __cache_unlock(pObj); - - return (pNode != NULL) ? pNode->data : NULL; -} - -static FORCE_INLINE void taosDecRef(SDataNode *pNode) { - if (pNode == NULL) { - return; - } - - if (pNode->refCount > 0) { - atomic_sub_fetch_32(&pNode->refCount, 1); - pTrace("key:%s is released by app.refcnt:%d", pNode->key, pNode->refCount); - } else { - /* - * safety check. - * app may false releases cached object twice, to decrease the refcount more than acquired - */ - pError("key:%s is released by app more than referenced.refcnt:%d", pNode->key, pNode->refCount); - } -} - -/** - * remove data in cache, the data will not be removed immediately. - * if it is referenced by other object, it will be remain in cache - * @param handle - * @param data - */ -void taosRemoveDataFromCache(void *handle, void **data, bool _remove) { - SCacheObj *pObj = (SCacheObj *)handle; - if (pObj == NULL || pObj->capacity == 0 || (*data) == NULL || (pObj->size + pObj->numOfElemsInTrash == 0)) return; - - size_t offset = offsetof(SDataNode, data); - SDataNode *pNode = (SDataNode *)((char *)(*data) - offset); - - if (pNode->signature != (uint64_t)pNode) { - pError("key: %p release invalid cache data", pNode); - return; - } - - *data = NULL; - - if (_remove) { - __cache_wr_lock(pObj); - // pNode may be released immediately by other thread after the reference count of pNode is set to 0, - // So we need to lock it in the first place. - taosDecRef(pNode); - taosCacheMoveNodeToTrash(pObj, pNode); - - __cache_unlock(pObj); - } else { - taosDecRef(pNode); - } -} - -/** - * get data from cache - * @param handle cache object - * @param key key - * @return cached data or NULL - */ -void *taosGetDataFromCache(void *handle, char *key) { - SCacheObj *pObj = (SCacheObj *)handle; - if (pObj == NULL || pObj->capacity == 0) return NULL; - - uint32_t keyLen = (uint32_t)strlen(key); - - __cache_rd_lock(pObj); - - SDataNode *ptNode = taosGetNodeFromHashTable(handle, key, keyLen); - if (ptNode != NULL) { - atomic_add_fetch_32(&ptNode->refCount, 1); - } - - __cache_unlock(pObj); - - if (ptNode != NULL) { - atomic_add_fetch_32(&pObj->statistics.hitCount, 1); - pTrace("key:%s is retrieved from cache,refcnt:%d", key, ptNode->refCount); - } else { - atomic_add_fetch_32(&pObj->statistics.missCount, 1); - pTrace("key:%s not in cache,retrieved failed", key); - } - - atomic_add_fetch_32(&pObj->statistics.totalAccess, 1); - return (ptNode != NULL) ? ptNode->data : NULL; -} - -/** - * update data in cache - * @param handle hash object handle(pointer) - * @param key key for hash - * @param pData actually data - * @param size length of data - * @return new referenced data - */ -void *taosUpdateDataFromCache(void *handle, char *key, char *pData, int size, int duration) { - SCacheObj *pObj = (SCacheObj *)handle; - if (pObj == NULL || pObj->capacity == 0) return NULL; - - SDataNode *pNew = NULL; - - uint32_t keyLen = strlen(key) + 1; - - __cache_wr_lock(pObj); - - SDataNode *pNode = taosGetNodeFromHashTable(handle, key, keyLen - 1); - - if (pNode == NULL) { // object has been released, do add operation - pNew = taosAddToCacheImpl(pObj, key, keyLen, pData, size, duration * 1000L); - pWarn("key:%s does not exist, update failed,do add to cache.total:%d,size:%ldbytes", key, pObj->size, - pObj->totalSize); - } else { - pNew = taosUpdateCacheImpl(pObj, pNode, key, keyLen, pData, size, duration * 1000L); - pTrace("key:%s updated.expireTime:%" PRIu64 ".refCnt:%d", key, pNode->time, pNode->refCount); - } - - __cache_unlock(pObj); - return (pNew != NULL) ? pNew->data : NULL; -} - -static void doCleanUpDataCache(SCacheObj* pObj) { - SDataNode *pNode, *pNext; - - __cache_wr_lock(pObj); - - if (pObj->hashList && pObj->size > 0) { - for (int i = 0; i < pObj->capacity; ++i) { - pNode = pObj->hashList[i]; - while (pNode) { - pNext = pNode->next; - free(pNode); - pNode = pNext; - } - } - - tfree(pObj->hashList); + + if (taosHashGetSize(pObj->pHashTable) > 0) { + taosHashCleanup(pObj->pHashTable); } - + __cache_unlock(pObj); - - taosClearCacheTrash(pObj, true); + + taosTrashEmpty(pObj, true); __cache_lock_destroy(pObj); - + memset(pObj, 0, sizeof(SCacheObj)); - free(pObj); } @@ -751,197 +319,244 @@ static void doCleanUpDataCache(SCacheObj* pObj) { * refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pObj->refreshTime * @param handle Cache object handle */ -void taosRefreshDataCache(void *handle, void *tmrId) { - SDataNode *pNode, *pNext; +static void taosCacheRefresh(void *handle, void *tmrId) { SCacheObj *pObj = (SCacheObj *)handle; - - if (pObj == NULL || pObj->capacity <= 0) { + + if (pObj == NULL || taosHashGetSize(pObj->pHashTable) == 0) { pTrace("object is destroyed. no refresh retry"); return; } - + if (pObj->deleting == 1) { - doCleanUpDataCache(pObj); + doCleanupDataCache(pObj); return; } - - uint64_t time = taosGetTimestampMs(); - uint32_t numOfCheck = 0; + + uint64_t expiredTime = taosGetTimestampMs(); pObj->statistics.refreshCount++; - - int32_t num = pObj->size; - - for (int i = 0; i < pObj->capacity; ++i) { - // in deleting process, quit refreshing immediately + + SHashMutableIterator *pIter = taosHashCreateIter(pObj->pHashTable); + + __cache_wr_lock(pObj); + while (taosHashIterNext(pIter)) { if (pObj->deleting == 1) { + taosHashDestroyIter(pIter); break; } - - __cache_wr_lock(pObj); - pNode = pObj->hashList[i]; - - while (pNode) { - numOfCheck++; - pNext = pNode->next; - - if (pNode->time <= time && pNode->refCount <= 0) { - taosCacheReleaseNode(pObj, pNode); - } - pNode = pNext; - } - - /* all data have been checked, not need to iterate further */ - if (numOfCheck == num || pObj->size <= 0) { - __cache_unlock(pObj); - break; + + SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter); + if (pNode->expiredTime <= expiredTime && T_REF_VAL_GET(pNode) <= 0) { + taosCacheReleaseNode(pObj, pNode); } - - __cache_unlock(pObj); } - - if (pObj->deleting == 1) { // clean up resources and abort - doCleanUpDataCache(pObj); + + __cache_unlock(pObj); + + taosHashDestroyIter(pIter); + + if (pObj->deleting == 1) { // clean up resources and abort + doCleanupDataCache(pObj); } else { - taosClearCacheTrash(pObj, false); - taosTmrReset(taosRefreshDataCache, pObj->refreshTime, pObj, pObj->tmrCtrl, &pObj->pTimer); - } -} - -/** - * - * @param handle - * @param tmrId - */ -void taosClearDataCache(void *handle) { - SDataNode *pNode, *pNext; - SCacheObj *pObj = (SCacheObj *)handle; - - int32_t capacity = pObj->capacity; - - for (int i = 0; i < capacity; ++i) { - __cache_wr_lock(pObj); - - pNode = pObj->hashList[i]; - - while (pNode) { - pNext = pNode->next; - taosCacheMoveNodeToTrash(pObj, pNode); - pNode = pNext; - } - - pObj->hashList[i] = NULL; - - __cache_unlock(pObj); + taosTrashEmpty(pObj, false); + taosTmrReset(taosCacheRefresh, pObj->refreshTime, pObj, pObj->tmrCtrl, &pObj->pTimer); } - - taosClearCacheTrash(pObj, false); } -/** - * @param capacity maximum slots available for hash elements - * @param tmrCtrl timer ctrl - * @param refreshTime refresh operation interval time, the maximum survival time when one element is expired and - * not referenced by other objects - * @return - */ -void *taosInitDataCache(int capacity, void *tmrCtrl, int64_t refreshTime) { - if (tmrCtrl == NULL || refreshTime <= 0 || capacity <= 0) { +SCacheObj *taosCacheInit(void *tmrCtrl, int64_t refreshTime) { + if (tmrCtrl == NULL || refreshTime <= 0) { return NULL; } - + SCacheObj *pObj = (SCacheObj *)calloc(1, sizeof(SCacheObj)); if (pObj == NULL) { pError("failed to allocate memory, reason:%s", strerror(errno)); return NULL; } - - // the max slots is not defined by user - pObj->capacity = taosHashTableLength(capacity); - assert((pObj->capacity & (pObj->capacity - 1)) == 0); - - pObj->hashFp = taosHashKey; - pObj->refreshTime = refreshTime * 1000; - - pObj->hashList = (SDataNode **)calloc(1, sizeof(SDataNode *) * pObj->capacity); - if (pObj->hashList == NULL) { + + pObj->pHashTable = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false); + if (pObj->pHashTable == NULL) { free(pObj); pError("failed to allocate memory, reason:%s", strerror(errno)); return NULL; } - + + // set free cache node callback function for hash table + taosHashSetFreecb(pObj->pHashTable, taosFreeNode); + + pObj->refreshTime = refreshTime * 1000; pObj->tmrCtrl = tmrCtrl; - taosTmrReset(taosRefreshDataCache, pObj->refreshTime, pObj, pObj->tmrCtrl, &pObj->pTimer); - + + taosTmrReset(taosCacheRefresh, pObj->refreshTime, pObj, pObj->tmrCtrl, &pObj->pTimer); + if (__cache_lock_init(pObj) != 0) { taosTmrStopA(&pObj->pTimer); - free(pObj->hashList); + taosHashCleanup(pObj->pHashTable); free(pObj); - + pError("failed to init lock, reason:%s", strerror(errno)); return NULL; } + + return pObj; +} - return (void *)pObj; +void *taosCachePut(void *handle, char *key, char *pData, int dataSize, int duration) { + SCacheDataNode *pNode; + SCacheObj * pObj; + + pObj = (SCacheObj *)handle; + if (pObj == NULL || pObj->pHashTable == NULL) { + return NULL; + } + + size_t keyLen = strlen(key); + + __cache_wr_lock(pObj); + SCacheDataNode **pt = (SCacheDataNode **)taosHashGet(pObj->pHashTable, key, keyLen); + SCacheDataNode * pOld = (pt != NULL) ? (*pt) : NULL; + + if (pOld == NULL) { // do addedTime to cache + pNode = taosAddToCacheImpl(pObj, key, keyLen, pData, dataSize, duration * 1000L); + if (NULL != pNode) { + pTrace("key:%s %p added into cache, addedTime:%" PRIu64 ", expireTime:%" PRIu64 ", cache total:%d, size:%" PRId64 + " bytes, collision:%d", + key, pNode, pNode->addedTime, pNode->expiredTime, dataSize, pObj->totalSize, + pObj->statistics.numOfCollision); + } + } else { // old data exists, update the node + pNode = taosUpdateCacheImpl(pObj, pOld, key, keyLen, pData, dataSize, duration * 1000L); + pTrace("key:%s %p exist in cache, updated", key, pNode); + } + + __cache_unlock(pObj); + + return (pNode != NULL) ? pNode->data : NULL; } -/** - * release all allocated memory and destroy the cache object. - * - * This function only set the deleting flag, and the specific work of clean up cache is delegated to - * taosRefreshDataCache function, which will executed every SCacheObj->refreshTime sec. - * - * If the value of SCacheObj->refreshTime is too large, the taosRefreshDataCache function may not be invoked - * before the main thread terminated, in which case all allocated resources are simply recycled by OS. - * - * @param handle - */ -void taosCleanUpDataCache(void *handle) { +void *taosCacheAcquireByName(void *handle, char *key) { SCacheObj *pObj = (SCacheObj *)handle; - if (pObj == NULL) { - return; + if (pObj == NULL || taosHashGetSize(pObj->pHashTable) == 0) { + return NULL; } - - pObj->deleting = 1; + + uint32_t keyLen = (uint32_t)strlen(key); + + __cache_rd_lock(pObj); + + SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pObj->pHashTable, key, keyLen); + if (ptNode != NULL) { + T_REF_INC(*ptNode); + } + + __cache_unlock(pObj); + + if (ptNode != NULL) { + atomic_add_fetch_32(&pObj->statistics.hitCount, 1); + pTrace("key:%s is retrieved from cache,refcnt:%d", key, T_REF_VAL_GET(*ptNode)); + } else { + atomic_add_fetch_32(&pObj->statistics.missCount, 1); + pTrace("key:%s not in cache,retrieved failed", key); + } + + atomic_add_fetch_32(&pObj->statistics.totalAccess, 1); + return (ptNode != NULL) ? (*ptNode)->data : NULL; } -void* taosGetDataFromExists(void* handle, void* data) { +void *taosCacheAcquireByData(void *handle, void *data) { SCacheObj *pObj = (SCacheObj *)handle; if (pObj == NULL || data == NULL) return NULL; - size_t offset = offsetof(SDataNode, data); - SDataNode *ptNode = (SDataNode *)((char *)data - offset); + size_t offset = offsetof(SCacheDataNode, data); + SCacheDataNode *ptNode = (SCacheDataNode *)((char *)data - offset); - if (ptNode->signature != (uint64_t) ptNode) { + if (ptNode->signature != (uint64_t)ptNode) { pError("key: %p the data from cache is invalid", ptNode); return NULL; } - int32_t ref = atomic_add_fetch_32(&ptNode->refCount, 1); - pTrace("%p add ref data in cache, refCnt:%d", data, ref) + int32_t ref = T_REF_INC(ptNode); + pTrace("%p addedTime ref data in cache, refCnt:%d", data, ref) // the data if referenced by at least one object, so the reference count must be greater than the value of 2. assert(ref >= 2); return data; } -void* taosTransferDataInCache(void* handle, void** data) { +void *taosCacheTransfer(void *handle, void **data) { SCacheObj *pObj = (SCacheObj *)handle; if (pObj == NULL || data == NULL) return NULL; - size_t offset = offsetof(SDataNode, data); - SDataNode *ptNode = (SDataNode *)((char *)(*data) - offset); + size_t offset = offsetof(SCacheDataNode, data); + SCacheDataNode *ptNode = (SCacheDataNode *)((char *)(*data) - offset); - if (ptNode->signature != (uint64_t) ptNode) { + if (ptNode->signature != (uint64_t)ptNode) { pError("key: %p the data from cache is invalid", ptNode); return NULL; } - assert(ptNode->refCount >= 1); + assert(T_REF_VAL_GET(ptNode) >= 1); - char* d = *data; + char *d = *data; // clear its reference to old area *data = NULL; return d; } + +void taosCacheRelease(void *handle, void **data, bool _remove) { + SCacheObj *pObj = (SCacheObj *)handle; + if (pObj == NULL || (*data) == NULL || (taosHashGetSize(pObj->pHashTable) + pObj->numOfElemsInTrash == 0)) { + return; + } + + size_t offset = offsetof(SCacheDataNode, data); + + SCacheDataNode *pNode = (SCacheDataNode *)((char *)(*data) - offset); + + if (pNode->signature != (uint64_t)pNode) { + pError("key: %p release invalid cache data", pNode); + return; + } + + *data = NULL; + + if (_remove) { + __cache_wr_lock(pObj); + // pNode may be released immediately by other thread after the reference count of pNode is set to 0, + // So we need to lock it in the first place. + T_REF_DEC(pNode); + taosCacheMoveToTrash(pObj, pNode); + + __cache_unlock(pObj); + } else { + T_REF_DEC(pNode); + } +} + +void taosCacheEmpty(SCacheObj *pCacheObj) { + SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable); + + __cache_wr_lock(pCacheObj); + while (taosHashIterNext(pIter)) { + if (pCacheObj->deleting == 1) { + taosHashDestroyIter(pIter); + break; + } + + SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter); + taosCacheMoveToTrash(pCacheObj, pNode); + } + __cache_unlock(pCacheObj); + + taosHashDestroyIter(pIter); + taosTrashEmpty(pCacheObj, false); +} + +void taosCacheCleanup(SCacheObj *pCacheObj) { + if (pCacheObj == NULL) { + return; + } + + pCacheObj->deleting = 1; +} diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c index 89b43816c86db232b2d48de86a839bc958296b38..0a4d7e74d6dca4f6d2857ce6902385e92842f47b 100644 --- a/src/client/src/tscLocal.c +++ b/src/client/src/tscLocal.c @@ -484,7 +484,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) { pSql->res.qhandle = 0x1; pSql->res.numOfRows = 0; } else if (pCmd->command == TSDB_SQL_RESET_CACHE) { - taosClearDataCache(tscCacheHandle); + taosCacheEmpty(tscCacheHandle); } else if (pCmd->command == TSDB_SQL_SERV_VERSION) { tscProcessServerVer(pSql); } else if (pCmd->command == TSDB_SQL_CLI_VERSION) { diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c index 83cf638133a4d673118594483a09a365abd06e22..a7c5b0f6b92735d44dfb62c5c6490f2e100f139f 100644 --- a/src/client/src/tscServer.c +++ b/src/client/src/tscServer.c @@ -182,7 +182,6 @@ int tscSendMsgToServer(SSqlObj *pSql) { } pSql->ipList->ip[0] = inet_addr("192.168.0.1"); - SSqlCmd* pCmd = &pSql->cmd; if (pSql->cmd.command < TSDB_SQL_MGMT) { pSql->ipList->port = tsDnodeShellPort; @@ -2641,7 +2640,7 @@ int tscProcessMeterMetaRsp(SSqlObj *pSql) { SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0); assert(pMeterMetaInfo->pMeterMeta == NULL); - pMeterMetaInfo->pMeterMeta = (STableMeta *)taosAddDataIntoCache(tscCacheHandle, pMeterMetaInfo->name, (char *)pMeta, + pMeterMetaInfo->pMeterMeta = (STableMeta *)taosCachePut(tscCacheHandle, pMeterMetaInfo->name, (char *)pMeta, pMeta->contLen, tsMeterMetaKeepTimer); // todo handle out of memory case if (pMeterMetaInfo->pMeterMeta == NULL) return 0; @@ -2750,7 +2749,7 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) { int32_t size = (int32_t)(rsp - ((char *)pMeta)); // Consistent with STableMeta in cache pMeta->index = 0; - (void)taosAddDataIntoCache(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsMeterMetaKeepTimer); + (void)taosCachePut(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsMeterMetaKeepTimer); } pSql->res.code = TSDB_CODE_SUCCESS; @@ -2857,9 +2856,9 @@ int tscProcessMetricMetaRsp(SSqlObj *pSql) { #endif // release the used metricmeta - taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false); + taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false); - pMeterMetaInfo->pMetricMeta = (SSuperTableMeta *)taosAddDataIntoCache(tscCacheHandle, name, (char *)metricMetaList[i], + pMeterMetaInfo->pMetricMeta = (SSuperTableMeta *)taosCachePut(tscCacheHandle, name, (char *)metricMetaList[i], sizes[i], tsMetricMetaKeepTimer); tfree(metricMetaList[i]); @@ -2917,11 +2916,11 @@ int tscProcessShowRsp(SSqlObj *pSql) { key[0] = pCmd->msgType + 'a'; strcpy(key + 1, "showlist"); - taosRemoveDataFromCache(tscCacheHandle, (void *)&(pMeterMetaInfo->pMeterMeta), false); + taosCacheRelease(tscCacheHandle, (void *)&(pMeterMetaInfo->pMeterMeta), false); int32_t size = pMeta->numOfColumns * sizeof(SSchema) + sizeof(STableMeta); pMeterMetaInfo->pMeterMeta = - (STableMeta *)taosAddDataIntoCache(tscCacheHandle, key, (char *)pMeta, size, tsMeterMetaKeepTimer); + (STableMeta *)taosCachePut(tscCacheHandle, key, (char *)pMeta, size, tsMeterMetaKeepTimer); pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutputCols; SSchema *pMeterSchema = tsGetSchema(pMeterMetaInfo->pMeterMeta); @@ -2975,14 +2974,14 @@ int tscProcessUseDbRsp(SSqlObj *pSql) { } int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) { - taosClearDataCache(tscCacheHandle); + taosCacheEmpty(tscCacheHandle); return 0; } int tscProcessDropTableRsp(SSqlObj *pSql) { SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0); - STableMeta *pMeterMeta = taosGetDataFromCache(tscCacheHandle, pMeterMetaInfo->name); + STableMeta *pMeterMeta = taosCacheAcquireByName(tscCacheHandle, pMeterMetaInfo->name); if (pMeterMeta == NULL) { /* not in cache, abort */ return 0; @@ -2996,11 +2995,11 @@ int tscProcessDropTableRsp(SSqlObj *pSql) { * instead. */ tscTrace("%p force release metermeta after drop table:%s", pSql, pMeterMetaInfo->name); - taosRemoveDataFromCache(tscCacheHandle, (void **)&pMeterMeta, true); + taosCacheRelease(tscCacheHandle, (void **)&pMeterMeta, true); if (pMeterMetaInfo->pMeterMeta) { - taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true); - taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), true); + taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true); + taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), true); } return 0; @@ -3009,23 +3008,23 @@ int tscProcessDropTableRsp(SSqlObj *pSql) { int tscProcessAlterTableMsgRsp(SSqlObj *pSql) { SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0); - STableMeta *pMeterMeta = taosGetDataFromCache(tscCacheHandle, pMeterMetaInfo->name); + STableMeta *pMeterMeta = taosCacheAcquireByName(tscCacheHandle, pMeterMetaInfo->name); if (pMeterMeta == NULL) { /* not in cache, abort */ return 0; } tscTrace("%p force release metermeta in cache after alter-table: %s", pSql, pMeterMetaInfo->name); - taosRemoveDataFromCache(tscCacheHandle, (void **)&pMeterMeta, true); + taosCacheRelease(tscCacheHandle, (void **)&pMeterMeta, true); if (pMeterMetaInfo->pMeterMeta) { bool isSuperTable = UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo); - taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true); - taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), true); + taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true); + taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), true); if (isSuperTable) { // if it is a super table, reset whole query cache tscTrace("%p reset query cache since table:%s is stable", pSql, pMeterMetaInfo->name); - taosClearDataCache(tscCacheHandle); + taosCacheEmpty(tscCacheHandle); } } @@ -3151,7 +3150,7 @@ static int32_t doGetMeterMetaFromServer(SSqlObj *pSql, SMeterMetaInfo *pMeterMet * Transfer the ownership of metermeta to the new object, instead of invoking the release/acquire routine */ if (code == TSDB_CODE_SUCCESS) { - pMeterMetaInfo->pMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**) &pNewMeterMetaInfo->pMeterMeta); + pMeterMetaInfo->pMeterMeta = taosCacheTransfer(tscCacheHandle, (void**) &pNewMeterMetaInfo->pMeterMeta); assert(pMeterMetaInfo->pMeterMeta != NULL); } @@ -3177,10 +3176,10 @@ int tscGetMeterMeta(SSqlObj *pSql, SMeterMetaInfo *pMeterMetaInfo) { // If this SMeterMetaInfo owns a metermeta, release it first if (pMeterMetaInfo->pMeterMeta != NULL) { - taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), false); + taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), false); } - pMeterMetaInfo->pMeterMeta = (STableMeta *)taosGetDataFromCache(tscCacheHandle, pMeterMetaInfo->name); + pMeterMetaInfo->pMeterMeta = (STableMeta *)taosCacheAcquireByName(tscCacheHandle, pMeterMetaInfo->name); if (pMeterMetaInfo->pMeterMeta != NULL) { STableMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta; @@ -3244,7 +3243,7 @@ int tscRenewMeterMeta(SSqlObj *pSql, char *tableId) { } tscWaitingForCreateTable(pCmd); - taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true); + taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true); code = doGetMeterMetaFromServer(pSql, pMeterMetaInfo); // todo ?? } else { @@ -3278,9 +3277,9 @@ int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) { SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i); tscGetMetricMetaCacheKey(pQueryInfo, tagstr, pMeterMetaInfo->pMeterMeta->uid); - taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false); + taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false); - SSuperTableMeta *ppMeta = (SSuperTableMeta *)taosGetDataFromCache(tscCacheHandle, tagstr); + SSuperTableMeta *ppMeta = (SSuperTableMeta *)taosCacheAcquireByName(tscCacheHandle, tagstr); if (ppMeta == NULL) { required = true; break; @@ -3308,7 +3307,7 @@ int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) { for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) { SMeterMetaInfo *pMMInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i); - STableMeta *pMeterMeta = taosGetDataFromCache(tscCacheHandle, pMMInfo->name); + STableMeta *pMeterMeta = taosCacheAcquireByName(tscCacheHandle, pMMInfo->name); tscAddMeterMetaInfo(pNewQueryInfo, pMMInfo->name, pMeterMeta, NULL, pMMInfo->numOfTags, pMMInfo->tagColumnIndex); } @@ -3353,8 +3352,8 @@ int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) { printf("create metric key:%s, index:%d\n", tagstr, i); #endif - taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false); - pMeterMetaInfo->pMetricMeta = (SSuperTableMeta *)taosGetDataFromCache(tscCacheHandle, tagstr); + taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false); + pMeterMetaInfo->pMetricMeta = (SSuperTableMeta *) taosCacheAcquireByName(tscCacheHandle, tagstr); } } diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 3d55ff1c7267adc242ff037e2df8faa423076319..f14643a72fb2afc97a50239529ae0678c55cd208 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -371,7 +371,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { pSql->sqlstr = NULL; taos_free_result_imp(pSql, 0); pSql->sqlstr = sqlstr; - taosClearDataCache(tscCacheHandle); + taosCacheEmpty(tscCacheHandle); if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL; tscTrace("meter synchronization completed"); } else { diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c index d11f21be0fea6853b05cf1a24ed5727f1eaae74f..866398b7f5cfeadf1d5eda6d1354128ce8ed1c75 100644 --- a/src/client/src/tscSystem.c +++ b/src/client/src/tscSystem.c @@ -186,7 +186,7 @@ void taos_init_imp() { refreshTime = refreshTime > 2 ? 2 : refreshTime; refreshTime = refreshTime < 1 ? 1 : refreshTime; - if (tscCacheHandle == NULL) tscCacheHandle = taosInitDataCache(tsMaxMeterConnections / 2, tscTmr, refreshTime); + if (tscCacheHandle == NULL) tscCacheHandle = taosCacheInit(tscTmr, refreshTime); tscConnCache = taosOpenConnCache(tsMaxMeterConnections * 2, NULL/*taosCloseRpcConn*/, tscTmr, tsShellActivityTimer * 1000); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 320aeee27e5f183b84184c56966780dde558054e..b655832f1159efe399cfeec94f8cc00cbb671f38 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -505,7 +505,7 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock) { tfree(pDataBlock->params); // free the refcount for metermeta - taosRemoveDataFromCache(tscCacheHandle, (void**)&(pDataBlock->pMeterMeta), false); + taosCacheRelease(tscCacheHandle, (void**)&(pDataBlock->pMeterMeta), false); tfree(pDataBlock); } @@ -589,9 +589,9 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) { // set the correct metermeta object, the metermeta has been locked in pDataBlocks, so it must be in the cache if (pMeterMetaInfo->pMeterMeta != pDataBlock->pMeterMeta) { strcpy(pMeterMetaInfo->name, pDataBlock->tableId); - taosRemoveDataFromCache(tscCacheHandle, (void**)&(pMeterMetaInfo->pMeterMeta), false); + taosCacheRelease(tscCacheHandle, (void**)&(pMeterMetaInfo->pMeterMeta), false); - pMeterMetaInfo->pMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**)&pDataBlock->pMeterMeta); + pMeterMetaInfo->pMeterMeta = taosCacheTransfer(tscCacheHandle, (void**)&pDataBlock->pMeterMeta); } else { assert(strncmp(pMeterMetaInfo->name, pDataBlock->tableId, tListLen(pDataBlock->tableId)) == 0); } @@ -665,7 +665,7 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff * due to operation such as drop database. So here we add the reference count directly instead of invoke * taosGetDataFromCache, which may return NULL value. */ - dataBuf->pMeterMeta = taosGetDataFromExists(tscCacheHandle, pMeterMeta); + dataBuf->pMeterMeta = taosCacheAcquireByData(tscCacheHandle, pMeterMeta); assert(initialSize > 0 && pMeterMeta != NULL && dataBuf->pMeterMeta != NULL); *dataBlocks = dataBuf; @@ -1940,8 +1940,8 @@ void tscClearMeterMetaInfo(SMeterMetaInfo* pMeterMetaInfo, bool removeFromCache) return; } - taosRemoveDataFromCache(tscCacheHandle, (void**)&(pMeterMetaInfo->pMeterMeta), removeFromCache); - taosRemoveDataFromCache(tscCacheHandle, (void**)&(pMeterMetaInfo->pMetricMeta), removeFromCache); + taosCacheRelease(tscCacheHandle, (void**)&(pMeterMetaInfo->pMeterMeta), removeFromCache); + taosCacheRelease(tscCacheHandle, (void**)&(pMeterMetaInfo->pMetricMeta), removeFromCache); } void tscResetForNextRetrieve(SSqlRes* pRes) { @@ -2071,16 +2071,16 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void SMeterMetaInfo* pFinalInfo = NULL; if (pPrevSql == NULL) { - STableMeta* pMeterMeta = taosGetDataFromCache(tscCacheHandle, name); - SSuperTableMeta* pMetricMeta = taosGetDataFromCache(tscCacheHandle, key); + STableMeta* pMeterMeta = taosCacheAcquireByName(tscCacheHandle, name); + SSuperTableMeta* pMetricMeta = taosCacheAcquireByName(tscCacheHandle, key); pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pMeterMeta, pMetricMeta, pMeterMetaInfo->numOfTags, pMeterMetaInfo->tagColumnIndex); } else { // transfer the ownership of pMeterMeta/pMetricMeta to the newly create sql object. SMeterMetaInfo* pPrevInfo = tscGetMeterMetaInfo(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0); - STableMeta* pPrevMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**)&pPrevInfo->pMeterMeta); - SSuperTableMeta* pPrevMetricMeta = taosTransferDataInCache(tscCacheHandle, (void**)&pPrevInfo->pMetricMeta); + STableMeta* pPrevMeterMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pMeterMeta); + SSuperTableMeta* pPrevMetricMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pMetricMeta); pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pPrevMeterMeta, pPrevMetricMeta, pMeterMetaInfo->numOfTags, pMeterMetaInfo->tagColumnIndex); diff --git a/src/query/inc/tcache.h b/src/query/inc/tcache.h index b577c53ea8dbcdc9f069288b94d0244907e77f12..6f6ef17d8d34cc851111ec760ad4e19ff2788b87 100644 --- a/src/query/inc/tcache.h +++ b/src/query/inc/tcache.h @@ -20,7 +20,63 @@ extern "C" { #endif -#include +#include "os.h" +#include "tref.h" +#include "hash.h" + +typedef struct SCacheStatis { + int64_t missCount; + int64_t hitCount; + int64_t totalAccess; + int64_t refreshCount; + int32_t numOfCollision; +} SCacheStatis; + +typedef struct SCacheDataNode { + uint64_t addedTime; // the added time when this element is added or updated into cache + uint64_t expiredTime; // expiredTime expiredTime when this element should be remove from cache + uint64_t signature; + uint32_t size; // allocated size for current SCacheDataNode + uint16_t keySize : 15; + bool inTrash : 1; // denote if it is in trash or not + T_REF_DECLARE() + char *key; + char data[]; +} SCacheDataNode; + +typedef struct STrashElem { + struct STrashElem *prev; + struct STrashElem *next; + SCacheDataNode * pData; +} STrashElem; + +typedef struct { + int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included. + int64_t refreshTime; + + /* + * to accommodate the old datanode which has the same key value of new one in hashList + * when an new node is put into cache, if an existed one with the same key: + * 1. if the old one does not be referenced, update it. + * 2. otherwise, move the old one to pTrash, addedTime the new one. + * + * when the node in pTrash does not be referenced, it will be release at the expired expiredTime + */ + STrashElem * pTrash; + void * tmrCtrl; + void * pTimer; + SCacheStatis statistics; + SHashObj * pHashTable; + int numOfElemsInTrash; // number of element in trash + int16_t deleting; // set the deleting flag to stop refreshing ASAP. + +#if defined(LINUX) + pthread_rwlock_t lock; +#else + pthread_mutex_t lock; +#endif + +} SCacheObj; /** * @@ -30,7 +86,7 @@ extern "C" { * not referenced by other objects * @return */ -void *taosInitDataCache(int maxSessions, void *tmrCtrl, int64_t refreshTimeInSeconds); +SCacheObj *taosCacheInit(void *tmrCtrl, int64_t refreshTimeInSeconds); /** * add data into cache @@ -42,28 +98,7 @@ void *taosInitDataCache(int maxSessions, void *tmrCtrl, int64_t refreshTimeInSec * @param keepTime survival time in second * @return cached element */ -void *taosAddDataIntoCache(void *handle, char *key, char *pData, int dataSize, int keepTimeInSeconds); - -/** - * remove data in cache, the data will not be removed immediately. - * if it is referenced by other object, it will be remain in cache - * @param handle cache object - * @param data not the key, actually referenced data - * @param _remove force model, reduce the ref count and move the data into - * pTrash - */ -void taosRemoveDataFromCache(void *handle, void **data, bool _remove); - -/** - * update data in cache - * @param handle hash object handle(pointer) - * @param key key for hash - * @param pData actually data - * @param size length of data - * @param duration survival time of this object in cache - * @return new referenced data - */ -void *taosUpdateDataFromCache(void *handle, char *key, char *pData, int size, int duration); +void *taosCachePut(void *handle, char *key, char *pData, int dataSize, int keepTimeInSeconds); /** * get data from cache @@ -71,40 +106,56 @@ void *taosUpdateDataFromCache(void *handle, char *key, char *pData, int size, in * @param key key * @return cached data or NULL */ -void *taosGetDataFromCache(void *handle, char *key); +void *taosCacheAcquireByName(void *handle, char *key); /** - * release all allocated memory and destroy the cache object + * Add one reference count for the exist data, and assign this data for a new owner. + * The new owner needs to invoke the taosCacheRelease when it does not need this data anymore. + * This procedure is a faster version of taosCacheAcquireByName function, which avoids the sideeffect of the problem of + * the data is moved to trash, and taosCacheAcquireByName will fail to retrieve it again. * * @param handle + * @param data + * @return */ -void taosCleanUpDataCache(void *handle); +void *taosCacheAcquireByData(void *handle, void *data); /** - * move all data node into trash,clear node in trash can if it is not referenced by client + * transfer the ownership of data in cache to another object without increasing reference count. * @param handle + * @param data + * @return */ -void taosClearDataCache(void *handle); +void *taosCacheTransfer(void *handle, void **data); /** - * Add one reference count for the exist data, and assign this data for a new owner. - * The new owner needs to invoke the taosRemoveDataFromCache when it does not need this data anymore. - * This procedure is a faster version of taosGetDataFromCache function, which avoids the sideeffect of the problem of the - * data is moved to trash, and taosGetDataFromCache will fail to retrieve it again. - * + * remove data in cache, the data will not be removed immediately. + * if it is referenced by other object, it will be remain in cache + * @param handle cache object + * @param data not the key, actually referenced data + * @param _remove force model, reduce the ref count and move the data into + * pTrash + */ +void taosCacheRelease(void *handle, void **data, bool _remove); + +/** + * move all data node into trash, clear node in trash can if it is not referenced by any clients * @param handle - * @param data - * @return */ -void* taosGetDataFromExists(void* handle, void* data); +void taosCacheEmpty(SCacheObj *pCacheObj); /** - * transfer the ownership of data in cache to another object without increasing reference count. + * release all allocated memory and destroy the cache object. + * + * This function only set the deleting flag, and the specific work of clean up cache is delegated to + * taosCacheRefresh function, which will executed every SCacheObj->refreshTime sec. + * + * If the value of SCacheObj->refreshTime is too large, the taosCacheRefresh function may not be invoked + * before the main thread terminated, in which case all allocated resources are simply recycled by OS. + * * @param handle - * @param data - * @return */ -void* taosTransferDataInCache(void* handle, void** data); +void taosCacheCleanup(SCacheObj *pCacheObj); #ifdef __cplusplus } diff --git a/src/util/inc/hash.h b/src/util/inc/hash.h index 8aa4da85f7bc41513312b0ce50715bee636073b4..1bbc8dcf5c229112b421de9a4b5ef4f056a39438 100644 --- a/src/util/inc/hash.h +++ b/src/util/inc/hash.h @@ -23,17 +23,18 @@ extern "C" { #include "hashfunc.h" #define HASH_MAX_CAPACITY (1024 * 1024 * 16) -#define HASH_VALUE_IN_TRASH (-1) #define HASH_DEFAULT_LOAD_FACTOR (0.75) #define HASH_INDEX(v, c) ((v) & ((c)-1)) +typedef void (*_hash_free_fn_t)(void *param); + typedef struct SHashNode { - char *key; // null-terminated string + char *key; union { struct SHashNode * prev; struct SHashEntry *prev1; }; - + struct SHashNode *next; uint32_t hashVal; // the hash value of key, if hashVal == HASH_VALUE_IN_TRASH, this node is moved to trash uint32_t keyLen; // length of the key @@ -46,18 +47,27 @@ typedef struct SHashEntry { } SHashEntry; typedef struct SHashObj { - SHashEntry **hashList; - size_t capacity; // number of slots - size_t size; // number of elements in hash table - _hash_fn_t hashFp; // hash function - -#if defined (LINUX) - pthread_rwlock_t* lock; + SHashEntry ** hashList; + size_t capacity; // number of slots + size_t size; // number of elements in hash table + _hash_fn_t hashFp; // hash function + _hash_free_fn_t freeFp; // hash node free callback function + +#if defined(LINUX) + pthread_rwlock_t *lock; #else - pthread_mutex_t* lock; + pthread_mutex_t *lock; #endif } SHashObj; +typedef struct SHashMutableIterator { + SHashObj * pHashObj; + int32_t entryIndex; + SHashNode *pCur; + SHashNode *pNext; // current node can be deleted for mutable iterator, so keep the next one before return current + int32_t num; // already check number of elements in hash table +} SHashMutableIterator; + /** * init the hash table * @@ -102,7 +112,7 @@ void *taosHashGet(SHashObj *pHashObj, const char *key, size_t keyLen); * @param key * @param keyLen */ -void taosHashRemove(SHashObj *pHashObj, const char *key, size_t keyLen); +void taosHashRemove(SHashObj *pHashObj, const char *key, size_t keyLen); /** * clean up hash table @@ -110,6 +120,41 @@ void taosHashRemove(SHashObj *pHashObj, const char *key, size_t keyLen); */ void taosHashCleanup(SHashObj *pHashObj); +/** + * Set the free callback function + * This function if set will be invoked right before freeing each hash node + * @param pHashObj + */ +void taosHashSetFreecb(SHashObj *pHashObj, _hash_free_fn_t freeFp); + +/** + * + * @param pHashObj + * @return + */ +SHashMutableIterator* taosHashCreateIter(SHashObj *pHashObj); + +/** + * + * @param iter + * @return + */ +bool taosHashIterNext(SHashMutableIterator *iter); + +/** + * + * @param iter + * @return + */ +void *taosHashIterGet(SHashMutableIterator *iter); + +/** + * + * @param iter + * @return + */ +void* taosHashDestroyIter(SHashMutableIterator* iter); + /** * * @param pHashObj diff --git a/src/util/inc/tref.h b/src/util/inc/tref.h new file mode 100644 index 0000000000000000000000000000000000000000..9483c1cc35e6d01c7b49c993d34f0c03ad950fdd --- /dev/null +++ b/src/util/inc/tref.h @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef TDENGINE_TREF_H +#define TDENGINE_TREF_H + +#include "os.h" + +typedef void (*_ref_fn_t)(const void* pObj); + +#define T_REF_DECLARE() \ + struct { \ + int16_t val; \ + } _ref; + +#define T_REF_REGISTER_FUNC(s, e) \ + struct { \ + _ref_fn_t start; \ + _ref_fn_t end; \ + } _ref_func = {.begin = (s), .end = (e)}; + +#define T_REF_INC(x) (atomic_add_fetch_16(&((x)->_ref.val), 1)); + +#define T_REF_INC_WITH_CB(x, p) \ + do { \ + int32_t v = atomic_add_fetch_32(&((x)->_ref.val), 1); \ + if (v == 1 && (p)->_ref_func.begin != NULL) { \ + (p)->_ref_func.begin((x)); \ + } \ + } while (0) + +#define T_REF_DEC(x) (atomic_sub_fetch_16(&((x)->_ref.val), 1)); + +#define T_REF_DEC_WITH_CB(x, p) \ + do { \ + int32_t v = atomic_sub_fetch_16(&((x)->_ref.val), 1); \ + if (v == 0 && (p)->_ref_func.end != NULL) { \ + (p)->_ref_func.end((x)); \ + } \ + } while (0) + +#define T_REF_VAL_CHECK(x) assert((x)->_ref.val >= 0); + +#define T_REF_VAL_GET(x) (x)->_ref.val + +#endif // TDENGINE_TREF_H diff --git a/src/util/src/hash.c b/src/util/src/hash.c index c69fbb772314d6a3569ee38ac043eac7fa413229..28af28e507c167d1ef2087ae3fae0e6dd9511607 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -24,8 +24,8 @@ static FORCE_INLINE void __wr_lock(void *lock) { if (lock == NULL) { return; } - -#if defined (LINUX) + +#if defined(LINUX) pthread_rwlock_wrlock(lock); #else pthread_mutex_lock(lock); @@ -36,8 +36,8 @@ static FORCE_INLINE void __rd_lock(void *lock) { if (lock == NULL) { return; } - -#if defined (LINUX) + +#if defined(LINUX) pthread_rwlock_rdlock(lock); #else pthread_mutex_lock(lock); @@ -48,8 +48,8 @@ static FORCE_INLINE void __unlock(void *lock) { if (lock == NULL) { return; } - -#if defined (LINUX) + +#if defined(LINUX) pthread_rwlock_unlock(lock); #else pthread_mutex_unlock(lock); @@ -60,8 +60,8 @@ static FORCE_INLINE int32_t __lock_init(void *lock) { if (lock == NULL) { return 0; } - -#if defined (LINUX) + +#if defined(LINUX) return pthread_rwlock_init(lock, NULL); #else return pthread_mutex_init(lock, NULL); @@ -72,8 +72,8 @@ static FORCE_INLINE void __lock_destroy(void *lock) { if (lock == NULL) { return; } - -#if defined (LINUX) + +#if defined(LINUX) pthread_rwlock_destroy(lock); #else pthread_mutex_destroy(lock); @@ -107,7 +107,7 @@ static void doUpdateHashTable(SHashObj *pHashObj, SHashNode *pNode) { /** * get SHashNode from hashlist, nodes from trash are not included. - * @param pHashObj Cache objection + * @param pHashObj Cache objection * @param key key for hash * @param keyLen key length * @return @@ -155,24 +155,24 @@ static void taosHashTableResize(SHashObj *pHashObj) { int32_t newSize = pHashObj->capacity << 1U; if (newSize > HASH_MAX_CAPACITY) { - pTrace("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached", pHashObj->capacity, - HASH_MAX_CAPACITY); + pTrace("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached", + pHashObj->capacity, HASH_MAX_CAPACITY); return; } int64_t st = taosGetTimestampUs(); - SHashEntry **pNewEntry = realloc(pHashObj->hashList, sizeof(SHashEntry*) * newSize); + SHashEntry **pNewEntry = realloc(pHashObj->hashList, sizeof(SHashEntry *) * newSize); if (pNewEntry == NULL) { pTrace("cache resize failed due to out of memory, capacity remain:%d", pHashObj->capacity); return; } pHashObj->hashList = pNewEntry; - for(int32_t i = pHashObj->capacity; i < newSize; ++i) { + for (int32_t i = pHashObj->capacity; i < newSize; ++i) { pHashObj->hashList[i] = calloc(1, sizeof(SHashEntry)); } - + pHashObj->capacity = newSize; for (int32_t i = 0; i < pHashObj->capacity; ++i) { @@ -182,7 +182,7 @@ static void taosHashTableResize(SHashObj *pHashObj) { if (pNode != NULL) { assert(pNode->prev1 == pEntry && pEntry->num > 0); } - + while (pNode) { int32_t j = HASH_INDEX(pNode->hashVal, pHashObj->capacity); if (j == i) { // this key resides in the same slot, no need to relocate it @@ -192,13 +192,13 @@ static void taosHashTableResize(SHashObj *pHashObj) { // remove from current slot assert(pNode->prev1 != NULL); - - if (pNode->prev1 == pEntry) { // first node of the overflow linked list + + if (pNode->prev1 == pEntry) { // first node of the overflow linked list pEntry->next = pNode->next; } else { pNode->prev->next = pNode->next; } - + pEntry->num--; assert(pEntry->num >= 0); @@ -214,13 +214,13 @@ static void taosHashTableResize(SHashObj *pHashObj) { if (pNewIndexEntry->next != NULL) { assert(pNewIndexEntry->next->prev1 == pNewIndexEntry); - + pNewIndexEntry->next->prev = pNode; } - + pNode->next = pNewIndexEntry->next; pNode->prev1 = pNewIndexEntry; - + pNewIndexEntry->next = pNode; pNewIndexEntry->num++; @@ -258,14 +258,14 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool threadsafe) { pHashObj->hashFp = fn; - pHashObj->hashList = (SHashEntry **)calloc(pHashObj->capacity, sizeof(SHashEntry*)); + pHashObj->hashList = (SHashEntry **)calloc(pHashObj->capacity, sizeof(SHashEntry *)); if (pHashObj->hashList == NULL) { free(pHashObj); pError("failed to allocate memory, reason:%s", strerror(errno)); return NULL; } - - for(int32_t i = 0; i < pHashObj->capacity; ++i) { + + for (int32_t i = 0; i < pHashObj->capacity; ++i) { pHashObj->hashList[i] = calloc(1, sizeof(SHashEntry)); } @@ -276,7 +276,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool threadsafe) { pHashObj->lock = calloc(1, sizeof(pthread_mutex_t)); #endif } - + if (__lock_init(pHashObj->lock) != 0) { free(pHashObj->hashList); free(pHashObj); @@ -347,7 +347,7 @@ static void doAddToHashTable(SHashObj *pHashObj, SHashNode *pNode) { int32_t index = HASH_INDEX(pNode->hashVal, pHashObj->capacity); SHashEntry *pEntry = pHashObj->hashList[index]; - + pNode->next = pEntry->next; if (pEntry->next) { @@ -356,7 +356,7 @@ static void doAddToHashTable(SHashObj *pHashObj, SHashNode *pNode) { pEntry->next = pNode; pNode->prev1 = pEntry; - + pEntry->num++; pHashObj->size++; } @@ -365,7 +365,7 @@ size_t taosHashGetSize(const SHashObj *pHashObj) { if (pHashObj == NULL) { return 0; } - + return pHashObj->size; } @@ -430,7 +430,7 @@ void *taosHashGet(SHashObj *pHashObj, const char *key, size_t keyLen) { void taosHashRemove(SHashObj *pHashObj, const char *key, size_t keyLen) { __wr_lock(pHashObj->lock); - uint32_t val = 0; + uint32_t val = 0; SHashNode *pNode = doGetNodeFromHashTable(pHashObj, key, keyLen, &val); if (pNode == NULL) { __unlock(pHashObj->lock); @@ -446,13 +446,13 @@ void taosHashRemove(SHashObj *pHashObj, const char *key, size_t keyLen) { pNode->prev->next = pNext; } } - + if (pNext != NULL) { pNext->prev = pNode->prev; } uint32_t index = HASH_INDEX(pNode->hashVal, pHashObj->capacity); - + SHashEntry *pEntry = pHashObj->hashList[index]; pEntry->num--; @@ -483,10 +483,14 @@ void taosHashCleanup(SHashObj *pHashObj) { while (pNode) { pNext = pNode->next; + if (pHashObj->freeFp) { + pHashObj->freeFp(pNode->data); + } + free(pNode); pNode = pNext; } - + tfree(pEntry); } @@ -496,24 +500,122 @@ void taosHashCleanup(SHashObj *pHashObj) { __unlock(pHashObj->lock); __lock_destroy(pHashObj->lock); + tfree(pHashObj->lock); memset(pHashObj, 0, sizeof(SHashObj)); free(pHashObj); } +void taosHashSetFreecb(SHashObj *pHashObj, _hash_free_fn_t freeFp) { + if (pHashObj == NULL || freeFp == NULL) { + return; + } + + pHashObj->freeFp = freeFp; +} + +SHashMutableIterator *taosHashCreateIter(SHashObj *pHashObj) { + SHashMutableIterator *pIter = calloc(1, sizeof(SHashMutableIterator)); + if (pIter == NULL) { + return NULL; + } + + pIter->pHashObj = pHashObj; +} + +static SHashNode *getNextHashNode(SHashMutableIterator *pIter) { + assert(pIter != NULL); + + while (pIter->entryIndex < pIter->pHashObj->capacity) { + SHashEntry *pEntry = pIter->pHashObj->hashList[pIter->entryIndex]; + if (pEntry->next == NULL) { + pIter->entryIndex++; + continue; + } + + return pEntry->next; + } + + return NULL; +} + +bool taosHashIterNext(SHashMutableIterator *pIter) { + if (pIter == NULL) { + return false; + } + + size_t size = taosHashGetSize(pIter->pHashObj); + if (size == 0 || pIter->num >= size) { + return false; + } + + // check the first one + if (pIter->num == 0) { + assert(pIter->pCur == NULL && pIter->pNext == NULL); + + while (1) { + SHashEntry *pEntry = pIter->pHashObj->hashList[pIter->entryIndex]; + if (pEntry->next == NULL) { + pIter->entryIndex++; + continue; + } + + pIter->pCur = pEntry->next; + + if (pIter->pCur->next) { + pIter->pNext = pIter->pCur->next; + } else { + pIter->pNext = getNextHashNode(pIter); + } + + break; + } + + pIter->num++; + return true; + } else { + assert(pIter->pCur != NULL); + if (pIter->pNext) { + pIter->pCur = pIter->pNext; + } else { // no more data in the hash list + return false; + } + + pIter->num++; + + if (pIter->pCur->next) { + pIter->pNext = pIter->pCur->next; + } else { + pIter->pNext = getNextHashNode(pIter); + } + + return true; + } +} + +void *taosHashIterGet(SHashMutableIterator *iter) { return (iter == NULL) ? NULL : iter->pCur->data; } + +void *taosHashDestroyIter(SHashMutableIterator *iter) { + if (iter == NULL) { + return NULL; + } + + free(iter); +} + // for profile only -int32_t taosHashGetMaxOverflowLinkLength(const SHashObj* pHashObj) { +int32_t taosHashGetMaxOverflowLinkLength(const SHashObj *pHashObj) { if (pHashObj == NULL || pHashObj->size == 0) { return 0; } - + int32_t num = 0; - - for(int32_t i = 0; i < pHashObj->size; ++i) { + + for (int32_t i = 0; i < pHashObj->size; ++i) { SHashEntry *pEntry = pHashObj->hashList[i]; if (num < pEntry->num) { num = pEntry->num; } } - + return num; }