diff --git a/src/client/src/tcache.c b/src/client/src/tcache.c
index f9941c1c0f2102c7fc2b0c31eb264cbabcb3498a..ff741d1fd95d1daea2dc1416dc8b38f2d08be180 100644
--- a/src/client/src/tcache.c
+++ b/src/client/src/tcache.c
@@ -13,86 +13,17 @@
* along with this program. If not, see .
*/
-#include "os.h"
-
-#include "hashfunc.h"
#include "tcache.h"
+#include "hash.h"
+#include "hashfunc.h"
+
#include "tlog.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
-#define HASH_MAX_CAPACITY (1024*1024*16)
-#define HASH_VALUE_IN_TRASH (-1)
-#define HASH_DEFAULT_LOAD_FACTOR (0.75)
-#define HASH_INDEX(v, c) ((v) & ((c)-1))
-
-/**
- * todo: refactor to extract the hash table out of cache structure
- */
-typedef struct SCacheStatis {
- int64_t missCount;
- int64_t hitCount;
- int64_t totalAccess;
- int64_t refreshCount;
- int32_t numOfCollision;
- int32_t numOfResize;
- int64_t resizeTime;
-} SCacheStatis;
-
-typedef struct _cache_node_t {
- char * key; // null-terminated string
- struct _cache_node_t *prev;
- struct _cache_node_t *next;
- uint64_t addTime; // the time when this element is added or updated into cache
- uint64_t time; // end time when this element should be remove from cache
- uint64_t signature;
-
- /*
- * reference count for this object
- * if this value is larger than 0, this value will never be released
- */
- uint32_t refCount;
- uint32_t hashVal; // the hash value of key, if hashVal == HASH_VALUE_IN_TRASH, this node is moved to trash
- uint32_t nodeSize; // allocated size for current SDataNode
- char data[];
-} SDataNode;
-
-typedef uint32_t (*_hashFunc)(const char *, uint32_t);
-
-typedef struct {
- SDataNode **hashList;
- int capacity;
- int size;
- int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included.
- int64_t refreshTime;
-
- /*
- * to accommodate the old datanode which has the same key value of new one in hashList
- * when an new node is put into cache, if an existed one with the same key:
- * 1. if the old one does not be referenced, update it.
- * 2. otherwise, move the old one to pTrash, add the new one.
- *
- * when the node in pTrash does not be referenced, it will be release at the expired time
- */
- SDataNode * pTrash;
- void * tmrCtrl;
- void * pTimer;
- SCacheStatis statistics;
- _hashFunc hashFp;
- int numOfElemsInTrash; // number of element in trash
- int16_t deleting; // set the deleting flag to stop refreshing asap.
-
-#if defined LINUX
- pthread_rwlock_t lock;
-#else
- pthread_mutex_t lock;
-#endif
-
-} SCacheObj;
-
static FORCE_INLINE void __cache_wr_lock(SCacheObj *pObj) {
-#if defined LINUX
+#if defined(LINUX)
pthread_rwlock_wrlock(&pObj->lock);
#else
pthread_mutex_lock(&pObj->lock);
@@ -100,7 +31,7 @@ static FORCE_INLINE void __cache_wr_lock(SCacheObj *pObj) {
}
static FORCE_INLINE void __cache_rd_lock(SCacheObj *pObj) {
-#if defined LINUX
+#if defined(LINUX)
pthread_rwlock_rdlock(&pObj->lock);
#else
pthread_mutex_lock(&pObj->lock);
@@ -108,7 +39,7 @@ static FORCE_INLINE void __cache_rd_lock(SCacheObj *pObj) {
}
static FORCE_INLINE void __cache_unlock(SCacheObj *pObj) {
-#if defined LINUX
+#if defined(LINUX)
pthread_rwlock_unlock(&pObj->lock);
#else
pthread_mutex_unlock(&pObj->lock);
@@ -116,7 +47,7 @@ static FORCE_INLINE void __cache_unlock(SCacheObj *pObj) {
}
static FORCE_INLINE int32_t __cache_lock_init(SCacheObj *pObj) {
-#if defined LINUX
+#if defined(LINUX)
return pthread_rwlock_init(&pObj->lock, NULL);
#else
return pthread_mutex_init(&pObj->lock, NULL);
@@ -124,19 +55,16 @@ static FORCE_INLINE int32_t __cache_lock_init(SCacheObj *pObj) {
}
static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pObj) {
-#if defined LINUX
+#if defined(LINUX)
pthread_rwlock_destroy(&pObj->lock);
#else
pthread_mutex_destroy(&pObj->lock);
#endif
}
-static FORCE_INLINE int32_t taosHashTableLength(int32_t length) {
- int32_t trueLength = MIN(length, HASH_MAX_CAPACITY);
-
- int32_t i = 4;
- while (i < trueLength) i = (i << 1);
- return i;
+static FORCE_INLINE void taosFreeNode(void *data) {
+ SCacheDataNode *pNode = *(SCacheDataNode **)data;
+ free(pNode);
}
/**
@@ -145,86 +73,83 @@ static FORCE_INLINE int32_t taosHashTableLength(int32_t length) {
* @param pData actually data. required a consecutive memory block, no pointer is allowed
* in pData. Pointer copy causes memory access error.
* @param size size of block
- * @param lifespan total survial time from now
- * @return SDataNode
+ * @param lifespan total survial expiredTime from now
+ * @return SCacheDataNode
*/
-static SDataNode *taosCreateHashNode(const char *key, uint32_t keyLen, const char *pData, size_t dataSize,
- uint64_t lifespan) {
- size_t totalSize = dataSize + sizeof(SDataNode) + keyLen;
-
- SDataNode *pNewNode = calloc(1, totalSize);
+static SCacheDataNode *taosCreateHashNode(const char *key, size_t keyLen, const char *pData, size_t size,
+ uint64_t duration) {
+ size_t totalSize = size + sizeof(SCacheDataNode) + keyLen;
+
+ SCacheDataNode *pNewNode = calloc(1, totalSize);
if (pNewNode == NULL) {
pError("failed to allocate memory, reason:%s", strerror(errno));
return NULL;
}
-
- memcpy(pNewNode->data, pData, dataSize);
- pNewNode->addTime = (uint64_t)taosGetTimestampMs();
- pNewNode->time = pNewNode->addTime + lifespan;
-
- pNewNode->key = pNewNode->data + dataSize;
- strcpy(pNewNode->key, key);
-
+
+ memcpy(pNewNode->data, pData, size);
+
+ pNewNode->key = (char *)pNewNode + sizeof(SCacheDataNode) + size;
+ pNewNode->keySize = keyLen;
+
+ memcpy(pNewNode->key, key, keyLen);
+
+ pNewNode->addedTime = (uint64_t)taosGetTimestampMs();
+ pNewNode->expiredTime = pNewNode->addedTime + duration;
+
pNewNode->signature = (uint64_t)pNewNode;
- pNewNode->nodeSize = (uint32_t)totalSize;
-
+ pNewNode->size = (uint32_t)totalSize;
+
return pNewNode;
}
/**
- * hash key function
- *
- * @param key key string
- * @param len length of key
- * @return hash value
- */
-static FORCE_INLINE uint32_t taosHashKey(const char *key, uint32_t len) { return MurmurHash3_32(key, len); }
-
-/**
- * add object node into trash, and this object is closed for referencing if it is add to trash
+ * addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash
* It will be removed until the pNode->refCount == 0
* @param pObj Cache object
* @param pNode Cache slot object
*/
-static void taosAddToTrash(SCacheObj *pObj, SDataNode *pNode) {
- if (pNode->hashVal == HASH_VALUE_IN_TRASH) { /* node is already in trash */
+static void taosAddToTrash(SCacheObj *pObj, SCacheDataNode *pNode) {
+ if (pNode->inTrash) { /* node is already in trash */
return;
}
-
- pNode->next = pObj->pTrash;
+
+ STrashElem *pElem = calloc(1, sizeof(STrashElem));
+ pElem->pData = pNode;
+
+ pElem->next = pObj->pTrash;
if (pObj->pTrash) {
- pObj->pTrash->prev = pNode;
+ pObj->pTrash->prev = pElem;
}
-
- pNode->prev = NULL;
- pObj->pTrash = pNode;
-
- pNode->hashVal = HASH_VALUE_IN_TRASH;
+
+ pElem->prev = NULL;
+ pObj->pTrash = pElem;
+
+ pNode->inTrash = true;
pObj->numOfElemsInTrash++;
-
+
pTrace("key:%s %p move to trash, numOfElem in trash:%d", pNode->key, pNode, pObj->numOfElemsInTrash);
}
-static void taosRemoveFromTrash(SCacheObj *pObj, SDataNode *pNode) {
- if (pNode->signature != (uint64_t)pNode) {
- pError("key:sig:%d %p data has been released, ignore", pNode->signature, pNode);
+static void taosRemoveFromTrash(SCacheObj *pObj, STrashElem *pElem) {
+ if (pElem->pData->signature != (uint64_t)pElem->pData) {
+ pError("key:sig:%d %p data has been released, ignore", pElem->pData->signature, pElem->pData);
return;
}
-
+
pObj->numOfElemsInTrash--;
- if (pNode->prev) {
- pNode->prev->next = pNode->next;
- } else {
- /* pnode is the header, update header */
- pObj->pTrash = pNode->next;
+ if (pElem->prev) {
+ pElem->prev->next = pElem->next;
+ } else { /* pnode is the header, update header */
+ pObj->pTrash = pElem->next;
}
-
- if (pNode->next) {
- pNode->next->prev = pNode->prev;
+
+ if (pElem->next) {
+ pElem->next->prev = pElem->prev;
}
-
- pNode->signature = 0;
- free(pNode);
+
+ pElem->pData->signature = 0;
+ free(pElem->pData);
+ free(pElem);
}
/**
* remove nodes in trash with refCount == 0 in cache
@@ -233,236 +158,57 @@ static void taosRemoveFromTrash(SCacheObj *pObj, SDataNode *pNode) {
* @param force force model, if true, remove data in trash without check refcount.
* may cause corruption. So, forece model only applys before cache is closed
*/
-static void taosClearCacheTrash(SCacheObj *pObj, bool force) {
+static void taosTrashEmpty(SCacheObj *pObj, bool force) {
__cache_wr_lock(pObj);
-
+
if (pObj->numOfElemsInTrash == 0) {
if (pObj->pTrash != NULL) {
pError("key:inconsistency data in cache, numOfElem in trash:%d", pObj->numOfElemsInTrash);
}
pObj->pTrash = NULL;
-
+
__cache_unlock(pObj);
return;
}
-
- SDataNode *pNode = pObj->pTrash;
-
- while (pNode) {
- if (pNode->refCount < 0) {
- pError("key:%s %p in trash released more than referenced, removed", pNode->key, pNode);
- pNode->refCount = 0;
- }
-
- if (pNode->next == pNode) {
- pNode->next = NULL;
+
+ STrashElem *pElem = pObj->pTrash;
+
+ while (pElem) {
+ T_REF_VAL_CHECK(pElem->pData);
+ if (pElem->next == pElem) {
+ pElem->next = NULL;
}
-
- if (force || (pNode->refCount == 0)) {
- pTrace("key:%s %p removed from trash. numOfElem in trash:%d", pNode->key, pNode, pObj->numOfElemsInTrash - 1)
- SDataNode *pTmp = pNode;
- pNode = pNode->next;
- taosRemoveFromTrash(pObj, pTmp);
+
+ if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
+ pTrace("key:%s %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData,
+ pObj->numOfElemsInTrash - 1);
+ STrashElem *p = pElem;
+
+ pElem = pElem->next;
+ taosRemoveFromTrash(pObj, p);
} else {
- pNode = pNode->next;
+ pElem = pElem->next;
}
}
-
+
assert(pObj->numOfElemsInTrash >= 0);
__cache_unlock(pObj);
}
-/**
- * add data node into cache
- * @param pObj cache object
- * @param pNode Cache slot object
- */
-static void taosAddNodeToHashTable(SCacheObj *pObj, SDataNode *pNode) {
- int32_t slotIndex = HASH_INDEX(pNode->hashVal, pObj->capacity);
- pNode->next = pObj->hashList[slotIndex];
-
- if (pObj->hashList[slotIndex] != NULL) {
- (pObj->hashList[slotIndex])->prev = pNode;
- pObj->statistics.numOfCollision++;
- }
- pObj->hashList[slotIndex] = pNode;
-
- pObj->size++;
- pObj->totalSize += pNode->nodeSize;
-
- pTrace("key:%s %p add to hash table", pNode->key, pNode);
-}
-
-/**
- * remove node in hash list
- * @param pObj
- * @param pNode
- */
-static void taosRemoveNodeInHashTable(SCacheObj *pObj, SDataNode *pNode) {
- if (pNode->hashVal == HASH_VALUE_IN_TRASH) return;
-
- SDataNode *pNext = pNode->next;
- if (pNode->prev != NULL) {
- pNode->prev->next = pNext;
- } else { /* the node is in hashlist, remove it */
- pObj->hashList[HASH_INDEX(pNode->hashVal, pObj->capacity)] = pNext;
- }
-
- if (pNext != NULL) {
- pNext->prev = pNode->prev;
- }
-
- pObj->size--;
- pObj->totalSize -= pNode->nodeSize;
-
- pNode->next = NULL;
- pNode->prev = NULL;
-
- pTrace("key:%s %p remove from hashtable", pNode->key, pNode);
-}
-
-/**
- * in-place node in hashlist
- * @param pObj cache object
- * @param pNode data node
- */
-static void taosUpdateInHashTable(SCacheObj *pObj, SDataNode *pNode) {
- assert(pNode->hashVal >= 0);
-
- if (pNode->prev) {
- pNode->prev->next = pNode;
- } else {
- pObj->hashList[HASH_INDEX(pNode->hashVal, pObj->capacity)] = pNode;
- }
-
- if (pNode->next) {
- (pNode->next)->prev = pNode;
- }
-
- pTrace("key:%s %p update hashtable", pNode->key, pNode);
-}
-
-/**
- * get SDataNode from hashlist, nodes from trash are not included.
- * @param pObj Cache objection
- * @param key key for hash
- * @param keyLen key length
- * @return
- */
-static SDataNode *taosGetNodeFromHashTable(SCacheObj *pObj, const char *key, uint32_t keyLen) {
- uint32_t hash = (*pObj->hashFp)(key, keyLen);
-
- int32_t slot = HASH_INDEX(hash, pObj->capacity);
- SDataNode *pNode = pObj->hashList[slot];
-
- while (pNode) {
- if (strcmp(pNode->key, key) == 0) break;
-
- pNode = pNode->next;
- }
-
- if (pNode) {
- assert(HASH_INDEX(pNode->hashVal, pObj->capacity) == slot);
- }
-
- return pNode;
-}
-
-/**
- * resize the hash list if the threshold is reached
- *
- * @param pObj
- */
-static void taosHashTableResize(SCacheObj *pObj) {
- if (pObj->size < pObj->capacity * HASH_DEFAULT_LOAD_FACTOR) {
- return;
- }
-
- // double the original capacity
- pObj->statistics.numOfResize++;
- SDataNode *pNode = NULL;
- SDataNode *pNext = NULL;
-
- int32_t newSize = pObj->capacity << 1;
- if (newSize > HASH_MAX_CAPACITY) {
- pTrace("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached",
- pObj->capacity, HASH_MAX_CAPACITY);
- return;
- }
-
- int64_t st = taosGetTimestampUs();
- SDataNode **pList = realloc(pObj->hashList, sizeof(SDataNode *) * newSize);
- if (pList == NULL) {
- pTrace("cache resize failed due to out of memory, capacity remain:%d", pObj->capacity);
- return;
- }
-
- pObj->hashList = pList;
-
- int32_t inc = newSize - pObj->capacity;
- memset(&pObj->hashList[pObj->capacity], 0, inc * sizeof(SDataNode *));
-
- pObj->capacity = newSize;
-
- for (int32_t i = 0; i < pObj->capacity; ++i) {
- pNode = pObj->hashList[i];
-
- while (pNode) {
- int32_t j = HASH_INDEX(pNode->hashVal, pObj->capacity);
- if (j == i) { // this key resides in the same slot, no need to relocate it
- pNode = pNode->next;
- } else {
- pNext = pNode->next;
-
- // remove from current slot
- if (pNode->prev != NULL) {
- pNode->prev->next = pNode->next;
- } else {
- pObj->hashList[i] = pNode->next;
- }
-
- if (pNode->next != NULL) {
- (pNode->next)->prev = pNode->prev;
- }
-
- // added into new slot
- pNode->next = NULL;
- pNode->prev = NULL;
-
- pNode->next = pObj->hashList[j];
-
- if (pObj->hashList[j] != NULL) {
- (pObj->hashList[j])->prev = pNode;
- }
- pObj->hashList[j] = pNode;
-
- // continue
- pNode = pNext;
- }
- }
- }
-
- int64_t et = taosGetTimestampUs();
- pObj->statistics.resizeTime += (et - st);
-
- pTrace("cache resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", pObj->capacity,
- ((double)pObj->size) / pObj->capacity, (et - st) / 1000.0);
-}
-
/**
* release node
* @param pObj cache object
* @param pNode data node
*/
-static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pObj, SDataNode *pNode) {
- taosRemoveNodeInHashTable(pObj, pNode);
+static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pObj, SCacheDataNode *pNode) {
if (pNode->signature != (uint64_t)pNode) {
pError("key:%s, %p data is invalid, or has been released", pNode->key, pNode);
return;
}
-
- pTrace("key:%s is removed from cache,total:%d,size:%ldbytes", pNode->key, pObj->size, pObj->totalSize);
- pNode->signature = 0;
+
+ taosHashRemove(pObj->pHashTable, pNode->key, pNode->keySize);
+ pTrace("key:%s is removed from cache,total:%d,size:%ldbytes", pNode->key, pObj->totalSize, pObj->totalSize);
+
free(pNode);
}
@@ -471,8 +217,8 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pObj, SDataNode *pNode)
* @param pObj
* @param pNode
*/
-static FORCE_INLINE void taosCacheMoveNodeToTrash(SCacheObj *pObj, SDataNode *pNode) {
- taosRemoveNodeInHashTable(pObj, pNode);
+static FORCE_INLINE void taosCacheMoveToTrash(SCacheObj *pObj, SCacheDataNode *pNode) {
+ taosHashRemove(pObj->pHashTable, pNode->key, pNode->keySize);
taosAddToTrash(pObj, pNode);
}
@@ -486,56 +232,53 @@ static FORCE_INLINE void taosCacheMoveNodeToTrash(SCacheObj *pObj, SDataNode *pN
* @param dataSize
* @return
*/
-static SDataNode *taosUpdateCacheImpl(SCacheObj *pObj, SDataNode *pNode, char *key, int32_t keyLen, void *pData,
- uint32_t dataSize, uint64_t keepTime) {
- SDataNode *pNewNode = NULL;
-
+static SCacheDataNode *taosUpdateCacheImpl(SCacheObj *pObj, SCacheDataNode *pNode, char *key, int32_t keyLen,
+ void *pData, uint32_t dataSize, uint64_t duration) {
+ SCacheDataNode *pNewNode = NULL;
+
// only a node is not referenced by any other object, in-place update it
- if (pNode->refCount == 0) {
- size_t newSize = sizeof(SDataNode) + dataSize + keyLen;
-
- pNewNode = (SDataNode *)realloc(pNode, newSize);
+ if (T_REF_VAL_GET(pNode) == 0) {
+ size_t newSize = sizeof(SCacheDataNode) + dataSize + keyLen;
+
+ pNewNode = (SCacheDataNode *)realloc(pNode, newSize);
if (pNewNode == NULL) {
return NULL;
}
-
+
pNewNode->signature = (uint64_t)pNewNode;
memcpy(pNewNode->data, pData, dataSize);
-
- pNewNode->key = pNewNode->data + dataSize;
- strcpy(pNewNode->key, key);
-
+
+ pNewNode->key = (char *)pNewNode + sizeof(SCacheDataNode) + dataSize;
+ pNewNode->keySize = keyLen;
+ memcpy(pNewNode->key, key, keyLen);
+
// update the timestamp information for updated key/value
- pNewNode->addTime = taosGetTimestampMs();
- pNewNode->time = pNewNode->addTime + keepTime;
-
- atomic_add_fetch_32(&pNewNode->refCount, 1);
-
+ pNewNode->addedTime = taosGetTimestampMs();
+ pNewNode->expiredTime = pNewNode->addedTime + duration;
+
+ T_REF_INC(pNewNode);
+
// the address of this node may be changed, so the prev and next element should update the corresponding pointer
- taosUpdateInHashTable(pObj, pNewNode);
+ taosHashPut(pObj->pHashTable, key, keyLen, &pNewNode, sizeof(void *));
} else {
- int32_t hashVal = pNode->hashVal;
- taosCacheMoveNodeToTrash(pObj, pNode);
-
- pNewNode = taosCreateHashNode(key, keyLen, pData, dataSize, keepTime);
+ taosCacheMoveToTrash(pObj, pNode);
+
+ pNewNode = taosCreateHashNode(key, keyLen, pData, dataSize, duration);
if (pNewNode == NULL) {
return NULL;
}
-
- atomic_add_fetch_32(&pNewNode->refCount, 1);
-
- assert(hashVal == (*pObj->hashFp)(key, keyLen - 1));
- pNewNode->hashVal = hashVal;
-
- // add new element to hashtable
- taosAddNodeToHashTable(pObj, pNewNode);
+
+ T_REF_INC(pNewNode);
+
+ // addedTime new element to hashtable
+ taosHashPut(pObj->pHashTable, key, keyLen, &pNewNode, sizeof(void *));
}
-
+
return pNewNode;
}
/**
- * add data into hash table
+ * addedTime data into hash table
* @param key
* @param pData
* @param size
@@ -544,206 +287,31 @@ static SDataNode *taosUpdateCacheImpl(SCacheObj *pObj, SDataNode *pNode, char *k
* @param pNode
* @return
*/
-static FORCE_INLINE SDataNode *taosAddToCacheImpl(SCacheObj *pObj, char *key, uint32_t keyLen, const char *pData,
- int dataSize, uint64_t lifespan) {
- SDataNode *pNode = taosCreateHashNode(key, keyLen, pData, dataSize, lifespan);
+static FORCE_INLINE SCacheDataNode *taosAddToCacheImpl(SCacheObj *pObj, char *key, size_t keyLen, const void *pData,
+ size_t dataSize, uint64_t duration) {
+ SCacheDataNode *pNode = taosCreateHashNode(key, keyLen, pData, dataSize, duration);
if (pNode == NULL) {
return NULL;
}
-
- atomic_add_fetch_32(&pNode->refCount, 1);
- pNode->hashVal = (*pObj->hashFp)(key, keyLen - 1);
- taosAddNodeToHashTable(pObj, pNode);
-
+
+ T_REF_INC(pNode);
+ taosHashPut(pObj->pHashTable, key, keyLen, &pNode, sizeof(void *));
return pNode;
}
-/**
- * add data into cache
- *
- * @param handle cache object
- * @param key key
- * @param pData cached data
- * @param dataSize data size
- * @param keepTime survival time in second
- * @return cached element
- */
-void *taosAddDataIntoCache(void *handle, char *key, char *pData, int dataSize, int keepTime) {
- SDataNode *pNode;
- SCacheObj *pObj;
-
- pObj = (SCacheObj *)handle;
- if (pObj == NULL || pObj->capacity == 0) return NULL;
-
- uint32_t keyLen = (uint32_t)strlen(key) + 1;
-
+static void doCleanupDataCache(SCacheObj *pObj) {
__cache_wr_lock(pObj);
-
- SDataNode *pOldNode = taosGetNodeFromHashTable(pObj, key, keyLen - 1);
-
- if (pOldNode == NULL) { // do add to cache
- // check if the threshold is reached
- taosHashTableResize(pObj);
-
- pNode = taosAddToCacheImpl(pObj, key, keyLen, pData, dataSize, keepTime * 1000L);
- if (NULL != pNode) {
- pTrace(
- "key:%s %p added into cache, slot:%d, addTime:%" PRIu64 ", expireTime:%" PRIu64 ", cache total:%d, "
- "size:%" PRId64 " bytes, collision:%d",
- pNode->key, pNode, HASH_INDEX(pNode->hashVal, pObj->capacity), pNode->addTime, pNode->time, pObj->size,
- pObj->totalSize, pObj->statistics.numOfCollision);
- }
- } else { // old data exists, update the node
- pNode = taosUpdateCacheImpl(pObj, pOldNode, key, keyLen, pData, dataSize, keepTime * 1000L);
- pTrace("key:%s %p exist in cache, updated", key, pNode);
- }
-
- __cache_unlock(pObj);
-
- return (pNode != NULL) ? pNode->data : NULL;
-}
-
-static FORCE_INLINE void taosDecRef(SDataNode *pNode) {
- if (pNode == NULL) {
- return;
- }
-
- if (pNode->refCount > 0) {
- atomic_sub_fetch_32(&pNode->refCount, 1);
- pTrace("key:%s is released by app.refcnt:%d", pNode->key, pNode->refCount);
- } else {
- /*
- * safety check.
- * app may false releases cached object twice, to decrease the refcount more than acquired
- */
- pError("key:%s is released by app more than referenced.refcnt:%d", pNode->key, pNode->refCount);
- }
-}
-
-/**
- * remove data in cache, the data will not be removed immediately.
- * if it is referenced by other object, it will be remain in cache
- * @param handle
- * @param data
- */
-void taosRemoveDataFromCache(void *handle, void **data, bool _remove) {
- SCacheObj *pObj = (SCacheObj *)handle;
- if (pObj == NULL || pObj->capacity == 0 || (*data) == NULL || (pObj->size + pObj->numOfElemsInTrash == 0)) return;
-
- size_t offset = offsetof(SDataNode, data);
- SDataNode *pNode = (SDataNode *)((char *)(*data) - offset);
-
- if (pNode->signature != (uint64_t)pNode) {
- pError("key: %p release invalid cache data", pNode);
- return;
- }
-
- *data = NULL;
-
- if (_remove) {
- __cache_wr_lock(pObj);
- // pNode may be released immediately by other thread after the reference count of pNode is set to 0,
- // So we need to lock it in the first place.
- taosDecRef(pNode);
- taosCacheMoveNodeToTrash(pObj, pNode);
-
- __cache_unlock(pObj);
- } else {
- taosDecRef(pNode);
- }
-}
-
-/**
- * get data from cache
- * @param handle cache object
- * @param key key
- * @return cached data or NULL
- */
-void *taosGetDataFromCache(void *handle, char *key) {
- SCacheObj *pObj = (SCacheObj *)handle;
- if (pObj == NULL || pObj->capacity == 0) return NULL;
-
- uint32_t keyLen = (uint32_t)strlen(key);
-
- __cache_rd_lock(pObj);
-
- SDataNode *ptNode = taosGetNodeFromHashTable(handle, key, keyLen);
- if (ptNode != NULL) {
- atomic_add_fetch_32(&ptNode->refCount, 1);
- }
-
- __cache_unlock(pObj);
-
- if (ptNode != NULL) {
- atomic_add_fetch_32(&pObj->statistics.hitCount, 1);
- pTrace("key:%s is retrieved from cache,refcnt:%d", key, ptNode->refCount);
- } else {
- atomic_add_fetch_32(&pObj->statistics.missCount, 1);
- pTrace("key:%s not in cache,retrieved failed", key);
- }
-
- atomic_add_fetch_32(&pObj->statistics.totalAccess, 1);
- return (ptNode != NULL) ? ptNode->data : NULL;
-}
-
-/**
- * update data in cache
- * @param handle hash object handle(pointer)
- * @param key key for hash
- * @param pData actually data
- * @param size length of data
- * @return new referenced data
- */
-void *taosUpdateDataFromCache(void *handle, char *key, char *pData, int size, int duration) {
- SCacheObj *pObj = (SCacheObj *)handle;
- if (pObj == NULL || pObj->capacity == 0) return NULL;
-
- SDataNode *pNew = NULL;
-
- uint32_t keyLen = strlen(key) + 1;
-
- __cache_wr_lock(pObj);
-
- SDataNode *pNode = taosGetNodeFromHashTable(handle, key, keyLen - 1);
-
- if (pNode == NULL) { // object has been released, do add operation
- pNew = taosAddToCacheImpl(pObj, key, keyLen, pData, size, duration * 1000L);
- pWarn("key:%s does not exist, update failed,do add to cache.total:%d,size:%ldbytes", key, pObj->size,
- pObj->totalSize);
- } else {
- pNew = taosUpdateCacheImpl(pObj, pNode, key, keyLen, pData, size, duration * 1000L);
- pTrace("key:%s updated.expireTime:%" PRIu64 ".refCnt:%d", key, pNode->time, pNode->refCount);
- }
-
- __cache_unlock(pObj);
- return (pNew != NULL) ? pNew->data : NULL;
-}
-
-static void doCleanUpDataCache(SCacheObj* pObj) {
- SDataNode *pNode, *pNext;
-
- __cache_wr_lock(pObj);
-
- if (pObj->hashList && pObj->size > 0) {
- for (int i = 0; i < pObj->capacity; ++i) {
- pNode = pObj->hashList[i];
- while (pNode) {
- pNext = pNode->next;
- free(pNode);
- pNode = pNext;
- }
- }
-
- tfree(pObj->hashList);
+
+ if (taosHashGetSize(pObj->pHashTable) > 0) {
+ taosHashCleanup(pObj->pHashTable);
}
-
+
__cache_unlock(pObj);
-
- taosClearCacheTrash(pObj, true);
+
+ taosTrashEmpty(pObj, true);
__cache_lock_destroy(pObj);
-
+
memset(pObj, 0, sizeof(SCacheObj));
-
free(pObj);
}
@@ -751,197 +319,244 @@ static void doCleanUpDataCache(SCacheObj* pObj) {
* refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pObj->refreshTime
* @param handle Cache object handle
*/
-void taosRefreshDataCache(void *handle, void *tmrId) {
- SDataNode *pNode, *pNext;
+static void taosCacheRefresh(void *handle, void *tmrId) {
SCacheObj *pObj = (SCacheObj *)handle;
-
- if (pObj == NULL || pObj->capacity <= 0) {
+
+ if (pObj == NULL || taosHashGetSize(pObj->pHashTable) == 0) {
pTrace("object is destroyed. no refresh retry");
return;
}
-
+
if (pObj->deleting == 1) {
- doCleanUpDataCache(pObj);
+ doCleanupDataCache(pObj);
return;
}
-
- uint64_t time = taosGetTimestampMs();
- uint32_t numOfCheck = 0;
+
+ uint64_t expiredTime = taosGetTimestampMs();
pObj->statistics.refreshCount++;
-
- int32_t num = pObj->size;
-
- for (int i = 0; i < pObj->capacity; ++i) {
- // in deleting process, quit refreshing immediately
+
+ SHashMutableIterator *pIter = taosHashCreateIter(pObj->pHashTable);
+
+ __cache_wr_lock(pObj);
+ while (taosHashIterNext(pIter)) {
if (pObj->deleting == 1) {
+ taosHashDestroyIter(pIter);
break;
}
-
- __cache_wr_lock(pObj);
- pNode = pObj->hashList[i];
-
- while (pNode) {
- numOfCheck++;
- pNext = pNode->next;
-
- if (pNode->time <= time && pNode->refCount <= 0) {
- taosCacheReleaseNode(pObj, pNode);
- }
- pNode = pNext;
- }
-
- /* all data have been checked, not need to iterate further */
- if (numOfCheck == num || pObj->size <= 0) {
- __cache_unlock(pObj);
- break;
+
+ SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
+ if (pNode->expiredTime <= expiredTime && T_REF_VAL_GET(pNode) <= 0) {
+ taosCacheReleaseNode(pObj, pNode);
}
-
- __cache_unlock(pObj);
}
-
- if (pObj->deleting == 1) { // clean up resources and abort
- doCleanUpDataCache(pObj);
+
+ __cache_unlock(pObj);
+
+ taosHashDestroyIter(pIter);
+
+ if (pObj->deleting == 1) { // clean up resources and abort
+ doCleanupDataCache(pObj);
} else {
- taosClearCacheTrash(pObj, false);
- taosTmrReset(taosRefreshDataCache, pObj->refreshTime, pObj, pObj->tmrCtrl, &pObj->pTimer);
- }
-}
-
-/**
- *
- * @param handle
- * @param tmrId
- */
-void taosClearDataCache(void *handle) {
- SDataNode *pNode, *pNext;
- SCacheObj *pObj = (SCacheObj *)handle;
-
- int32_t capacity = pObj->capacity;
-
- for (int i = 0; i < capacity; ++i) {
- __cache_wr_lock(pObj);
-
- pNode = pObj->hashList[i];
-
- while (pNode) {
- pNext = pNode->next;
- taosCacheMoveNodeToTrash(pObj, pNode);
- pNode = pNext;
- }
-
- pObj->hashList[i] = NULL;
-
- __cache_unlock(pObj);
+ taosTrashEmpty(pObj, false);
+ taosTmrReset(taosCacheRefresh, pObj->refreshTime, pObj, pObj->tmrCtrl, &pObj->pTimer);
}
-
- taosClearCacheTrash(pObj, false);
}
-/**
- * @param capacity maximum slots available for hash elements
- * @param tmrCtrl timer ctrl
- * @param refreshTime refresh operation interval time, the maximum survival time when one element is expired and
- * not referenced by other objects
- * @return
- */
-void *taosInitDataCache(int capacity, void *tmrCtrl, int64_t refreshTime) {
- if (tmrCtrl == NULL || refreshTime <= 0 || capacity <= 0) {
+SCacheObj *taosCacheInit(void *tmrCtrl, int64_t refreshTime) {
+ if (tmrCtrl == NULL || refreshTime <= 0) {
return NULL;
}
-
+
SCacheObj *pObj = (SCacheObj *)calloc(1, sizeof(SCacheObj));
if (pObj == NULL) {
pError("failed to allocate memory, reason:%s", strerror(errno));
return NULL;
}
-
- // the max slots is not defined by user
- pObj->capacity = taosHashTableLength(capacity);
- assert((pObj->capacity & (pObj->capacity - 1)) == 0);
-
- pObj->hashFp = taosHashKey;
- pObj->refreshTime = refreshTime * 1000;
-
- pObj->hashList = (SDataNode **)calloc(1, sizeof(SDataNode *) * pObj->capacity);
- if (pObj->hashList == NULL) {
+
+ pObj->pHashTable = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false);
+ if (pObj->pHashTable == NULL) {
free(pObj);
pError("failed to allocate memory, reason:%s", strerror(errno));
return NULL;
}
-
+
+ // set free cache node callback function for hash table
+ taosHashSetFreecb(pObj->pHashTable, taosFreeNode);
+
+ pObj->refreshTime = refreshTime * 1000;
pObj->tmrCtrl = tmrCtrl;
- taosTmrReset(taosRefreshDataCache, pObj->refreshTime, pObj, pObj->tmrCtrl, &pObj->pTimer);
-
+
+ taosTmrReset(taosCacheRefresh, pObj->refreshTime, pObj, pObj->tmrCtrl, &pObj->pTimer);
+
if (__cache_lock_init(pObj) != 0) {
taosTmrStopA(&pObj->pTimer);
- free(pObj->hashList);
+ taosHashCleanup(pObj->pHashTable);
free(pObj);
-
+
pError("failed to init lock, reason:%s", strerror(errno));
return NULL;
}
+
+ return pObj;
+}
- return (void *)pObj;
+void *taosCachePut(void *handle, char *key, char *pData, int dataSize, int duration) {
+ SCacheDataNode *pNode;
+ SCacheObj * pObj;
+
+ pObj = (SCacheObj *)handle;
+ if (pObj == NULL || pObj->pHashTable == NULL) {
+ return NULL;
+ }
+
+ size_t keyLen = strlen(key);
+
+ __cache_wr_lock(pObj);
+ SCacheDataNode **pt = (SCacheDataNode **)taosHashGet(pObj->pHashTable, key, keyLen);
+ SCacheDataNode * pOld = (pt != NULL) ? (*pt) : NULL;
+
+ if (pOld == NULL) { // do addedTime to cache
+ pNode = taosAddToCacheImpl(pObj, key, keyLen, pData, dataSize, duration * 1000L);
+ if (NULL != pNode) {
+ pTrace("key:%s %p added into cache, addedTime:%" PRIu64 ", expireTime:%" PRIu64 ", cache total:%d, size:%" PRId64
+ " bytes, collision:%d",
+ key, pNode, pNode->addedTime, pNode->expiredTime, dataSize, pObj->totalSize,
+ pObj->statistics.numOfCollision);
+ }
+ } else { // old data exists, update the node
+ pNode = taosUpdateCacheImpl(pObj, pOld, key, keyLen, pData, dataSize, duration * 1000L);
+ pTrace("key:%s %p exist in cache, updated", key, pNode);
+ }
+
+ __cache_unlock(pObj);
+
+ return (pNode != NULL) ? pNode->data : NULL;
}
-/**
- * release all allocated memory and destroy the cache object.
- *
- * This function only set the deleting flag, and the specific work of clean up cache is delegated to
- * taosRefreshDataCache function, which will executed every SCacheObj->refreshTime sec.
- *
- * If the value of SCacheObj->refreshTime is too large, the taosRefreshDataCache function may not be invoked
- * before the main thread terminated, in which case all allocated resources are simply recycled by OS.
- *
- * @param handle
- */
-void taosCleanUpDataCache(void *handle) {
+void *taosCacheAcquireByName(void *handle, char *key) {
SCacheObj *pObj = (SCacheObj *)handle;
- if (pObj == NULL) {
- return;
+ if (pObj == NULL || taosHashGetSize(pObj->pHashTable) == 0) {
+ return NULL;
}
-
- pObj->deleting = 1;
+
+ uint32_t keyLen = (uint32_t)strlen(key);
+
+ __cache_rd_lock(pObj);
+
+ SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pObj->pHashTable, key, keyLen);
+ if (ptNode != NULL) {
+ T_REF_INC(*ptNode);
+ }
+
+ __cache_unlock(pObj);
+
+ if (ptNode != NULL) {
+ atomic_add_fetch_32(&pObj->statistics.hitCount, 1);
+ pTrace("key:%s is retrieved from cache,refcnt:%d", key, T_REF_VAL_GET(*ptNode));
+ } else {
+ atomic_add_fetch_32(&pObj->statistics.missCount, 1);
+ pTrace("key:%s not in cache,retrieved failed", key);
+ }
+
+ atomic_add_fetch_32(&pObj->statistics.totalAccess, 1);
+ return (ptNode != NULL) ? (*ptNode)->data : NULL;
}
-void* taosGetDataFromExists(void* handle, void* data) {
+void *taosCacheAcquireByData(void *handle, void *data) {
SCacheObj *pObj = (SCacheObj *)handle;
if (pObj == NULL || data == NULL) return NULL;
- size_t offset = offsetof(SDataNode, data);
- SDataNode *ptNode = (SDataNode *)((char *)data - offset);
+ size_t offset = offsetof(SCacheDataNode, data);
+ SCacheDataNode *ptNode = (SCacheDataNode *)((char *)data - offset);
- if (ptNode->signature != (uint64_t) ptNode) {
+ if (ptNode->signature != (uint64_t)ptNode) {
pError("key: %p the data from cache is invalid", ptNode);
return NULL;
}
- int32_t ref = atomic_add_fetch_32(&ptNode->refCount, 1);
- pTrace("%p add ref data in cache, refCnt:%d", data, ref)
+ int32_t ref = T_REF_INC(ptNode);
+ pTrace("%p addedTime ref data in cache, refCnt:%d", data, ref)
// the data if referenced by at least one object, so the reference count must be greater than the value of 2.
assert(ref >= 2);
return data;
}
-void* taosTransferDataInCache(void* handle, void** data) {
+void *taosCacheTransfer(void *handle, void **data) {
SCacheObj *pObj = (SCacheObj *)handle;
if (pObj == NULL || data == NULL) return NULL;
- size_t offset = offsetof(SDataNode, data);
- SDataNode *ptNode = (SDataNode *)((char *)(*data) - offset);
+ size_t offset = offsetof(SCacheDataNode, data);
+ SCacheDataNode *ptNode = (SCacheDataNode *)((char *)(*data) - offset);
- if (ptNode->signature != (uint64_t) ptNode) {
+ if (ptNode->signature != (uint64_t)ptNode) {
pError("key: %p the data from cache is invalid", ptNode);
return NULL;
}
- assert(ptNode->refCount >= 1);
+ assert(T_REF_VAL_GET(ptNode) >= 1);
- char* d = *data;
+ char *d = *data;
// clear its reference to old area
*data = NULL;
return d;
}
+
+void taosCacheRelease(void *handle, void **data, bool _remove) {
+ SCacheObj *pObj = (SCacheObj *)handle;
+ if (pObj == NULL || (*data) == NULL || (taosHashGetSize(pObj->pHashTable) + pObj->numOfElemsInTrash == 0)) {
+ return;
+ }
+
+ size_t offset = offsetof(SCacheDataNode, data);
+
+ SCacheDataNode *pNode = (SCacheDataNode *)((char *)(*data) - offset);
+
+ if (pNode->signature != (uint64_t)pNode) {
+ pError("key: %p release invalid cache data", pNode);
+ return;
+ }
+
+ *data = NULL;
+
+ if (_remove) {
+ __cache_wr_lock(pObj);
+ // pNode may be released immediately by other thread after the reference count of pNode is set to 0,
+ // So we need to lock it in the first place.
+ T_REF_DEC(pNode);
+ taosCacheMoveToTrash(pObj, pNode);
+
+ __cache_unlock(pObj);
+ } else {
+ T_REF_DEC(pNode);
+ }
+}
+
+void taosCacheEmpty(SCacheObj *pCacheObj) {
+ SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);
+
+ __cache_wr_lock(pCacheObj);
+ while (taosHashIterNext(pIter)) {
+ if (pCacheObj->deleting == 1) {
+ taosHashDestroyIter(pIter);
+ break;
+ }
+
+ SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
+ taosCacheMoveToTrash(pCacheObj, pNode);
+ }
+ __cache_unlock(pCacheObj);
+
+ taosHashDestroyIter(pIter);
+ taosTrashEmpty(pCacheObj, false);
+}
+
+void taosCacheCleanup(SCacheObj *pCacheObj) {
+ if (pCacheObj == NULL) {
+ return;
+ }
+
+ pCacheObj->deleting = 1;
+}
diff --git a/src/client/src/tscLocal.c b/src/client/src/tscLocal.c
index 89b43816c86db232b2d48de86a839bc958296b38..0a4d7e74d6dca4f6d2857ce6902385e92842f47b 100644
--- a/src/client/src/tscLocal.c
+++ b/src/client/src/tscLocal.c
@@ -484,7 +484,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
pSql->res.qhandle = 0x1;
pSql->res.numOfRows = 0;
} else if (pCmd->command == TSDB_SQL_RESET_CACHE) {
- taosClearDataCache(tscCacheHandle);
+ taosCacheEmpty(tscCacheHandle);
} else if (pCmd->command == TSDB_SQL_SERV_VERSION) {
tscProcessServerVer(pSql);
} else if (pCmd->command == TSDB_SQL_CLI_VERSION) {
diff --git a/src/client/src/tscServer.c b/src/client/src/tscServer.c
index 83cf638133a4d673118594483a09a365abd06e22..a7c5b0f6b92735d44dfb62c5c6490f2e100f139f 100644
--- a/src/client/src/tscServer.c
+++ b/src/client/src/tscServer.c
@@ -182,7 +182,6 @@ int tscSendMsgToServer(SSqlObj *pSql) {
}
pSql->ipList->ip[0] = inet_addr("192.168.0.1");
- SSqlCmd* pCmd = &pSql->cmd;
if (pSql->cmd.command < TSDB_SQL_MGMT) {
pSql->ipList->port = tsDnodeShellPort;
@@ -2641,7 +2640,7 @@ int tscProcessMeterMetaRsp(SSqlObj *pSql) {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
assert(pMeterMetaInfo->pMeterMeta == NULL);
- pMeterMetaInfo->pMeterMeta = (STableMeta *)taosAddDataIntoCache(tscCacheHandle, pMeterMetaInfo->name, (char *)pMeta,
+ pMeterMetaInfo->pMeterMeta = (STableMeta *)taosCachePut(tscCacheHandle, pMeterMetaInfo->name, (char *)pMeta,
pMeta->contLen, tsMeterMetaKeepTimer);
// todo handle out of memory case
if (pMeterMetaInfo->pMeterMeta == NULL) return 0;
@@ -2750,7 +2749,7 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
int32_t size = (int32_t)(rsp - ((char *)pMeta)); // Consistent with STableMeta in cache
pMeta->index = 0;
- (void)taosAddDataIntoCache(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsMeterMetaKeepTimer);
+ (void)taosCachePut(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsMeterMetaKeepTimer);
}
pSql->res.code = TSDB_CODE_SUCCESS;
@@ -2857,9 +2856,9 @@ int tscProcessMetricMetaRsp(SSqlObj *pSql) {
#endif
// release the used metricmeta
- taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);
+ taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);
- pMeterMetaInfo->pMetricMeta = (SSuperTableMeta *)taosAddDataIntoCache(tscCacheHandle, name, (char *)metricMetaList[i],
+ pMeterMetaInfo->pMetricMeta = (SSuperTableMeta *)taosCachePut(tscCacheHandle, name, (char *)metricMetaList[i],
sizes[i], tsMetricMetaKeepTimer);
tfree(metricMetaList[i]);
@@ -2917,11 +2916,11 @@ int tscProcessShowRsp(SSqlObj *pSql) {
key[0] = pCmd->msgType + 'a';
strcpy(key + 1, "showlist");
- taosRemoveDataFromCache(tscCacheHandle, (void *)&(pMeterMetaInfo->pMeterMeta), false);
+ taosCacheRelease(tscCacheHandle, (void *)&(pMeterMetaInfo->pMeterMeta), false);
int32_t size = pMeta->numOfColumns * sizeof(SSchema) + sizeof(STableMeta);
pMeterMetaInfo->pMeterMeta =
- (STableMeta *)taosAddDataIntoCache(tscCacheHandle, key, (char *)pMeta, size, tsMeterMetaKeepTimer);
+ (STableMeta *)taosCachePut(tscCacheHandle, key, (char *)pMeta, size, tsMeterMetaKeepTimer);
pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutputCols;
SSchema *pMeterSchema = tsGetSchema(pMeterMetaInfo->pMeterMeta);
@@ -2975,14 +2974,14 @@ int tscProcessUseDbRsp(SSqlObj *pSql) {
}
int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
- taosClearDataCache(tscCacheHandle);
+ taosCacheEmpty(tscCacheHandle);
return 0;
}
int tscProcessDropTableRsp(SSqlObj *pSql) {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
- STableMeta *pMeterMeta = taosGetDataFromCache(tscCacheHandle, pMeterMetaInfo->name);
+ STableMeta *pMeterMeta = taosCacheAcquireByName(tscCacheHandle, pMeterMetaInfo->name);
if (pMeterMeta == NULL) {
/* not in cache, abort */
return 0;
@@ -2996,11 +2995,11 @@ int tscProcessDropTableRsp(SSqlObj *pSql) {
* instead.
*/
tscTrace("%p force release metermeta after drop table:%s", pSql, pMeterMetaInfo->name);
- taosRemoveDataFromCache(tscCacheHandle, (void **)&pMeterMeta, true);
+ taosCacheRelease(tscCacheHandle, (void **)&pMeterMeta, true);
if (pMeterMetaInfo->pMeterMeta) {
- taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
- taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), true);
+ taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
+ taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), true);
}
return 0;
@@ -3009,23 +3008,23 @@ int tscProcessDropTableRsp(SSqlObj *pSql) {
int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
- STableMeta *pMeterMeta = taosGetDataFromCache(tscCacheHandle, pMeterMetaInfo->name);
+ STableMeta *pMeterMeta = taosCacheAcquireByName(tscCacheHandle, pMeterMetaInfo->name);
if (pMeterMeta == NULL) { /* not in cache, abort */
return 0;
}
tscTrace("%p force release metermeta in cache after alter-table: %s", pSql, pMeterMetaInfo->name);
- taosRemoveDataFromCache(tscCacheHandle, (void **)&pMeterMeta, true);
+ taosCacheRelease(tscCacheHandle, (void **)&pMeterMeta, true);
if (pMeterMetaInfo->pMeterMeta) {
bool isSuperTable = UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo);
- taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
- taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), true);
+ taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
+ taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), true);
if (isSuperTable) { // if it is a super table, reset whole query cache
tscTrace("%p reset query cache since table:%s is stable", pSql, pMeterMetaInfo->name);
- taosClearDataCache(tscCacheHandle);
+ taosCacheEmpty(tscCacheHandle);
}
}
@@ -3151,7 +3150,7 @@ static int32_t doGetMeterMetaFromServer(SSqlObj *pSql, SMeterMetaInfo *pMeterMet
* Transfer the ownership of metermeta to the new object, instead of invoking the release/acquire routine
*/
if (code == TSDB_CODE_SUCCESS) {
- pMeterMetaInfo->pMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**) &pNewMeterMetaInfo->pMeterMeta);
+ pMeterMetaInfo->pMeterMeta = taosCacheTransfer(tscCacheHandle, (void**) &pNewMeterMetaInfo->pMeterMeta);
assert(pMeterMetaInfo->pMeterMeta != NULL);
}
@@ -3177,10 +3176,10 @@ int tscGetMeterMeta(SSqlObj *pSql, SMeterMetaInfo *pMeterMetaInfo) {
// If this SMeterMetaInfo owns a metermeta, release it first
if (pMeterMetaInfo->pMeterMeta != NULL) {
- taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), false);
+ taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), false);
}
- pMeterMetaInfo->pMeterMeta = (STableMeta *)taosGetDataFromCache(tscCacheHandle, pMeterMetaInfo->name);
+ pMeterMetaInfo->pMeterMeta = (STableMeta *)taosCacheAcquireByName(tscCacheHandle, pMeterMetaInfo->name);
if (pMeterMetaInfo->pMeterMeta != NULL) {
STableMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta;
@@ -3244,7 +3243,7 @@ int tscRenewMeterMeta(SSqlObj *pSql, char *tableId) {
}
tscWaitingForCreateTable(pCmd);
- taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
+ taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
code = doGetMeterMetaFromServer(pSql, pMeterMetaInfo); // todo ??
} else {
@@ -3278,9 +3277,9 @@ int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
tscGetMetricMetaCacheKey(pQueryInfo, tagstr, pMeterMetaInfo->pMeterMeta->uid);
- taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);
+ taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);
- SSuperTableMeta *ppMeta = (SSuperTableMeta *)taosGetDataFromCache(tscCacheHandle, tagstr);
+ SSuperTableMeta *ppMeta = (SSuperTableMeta *)taosCacheAcquireByName(tscCacheHandle, tagstr);
if (ppMeta == NULL) {
required = true;
break;
@@ -3308,7 +3307,7 @@ int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) {
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
SMeterMetaInfo *pMMInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
- STableMeta *pMeterMeta = taosGetDataFromCache(tscCacheHandle, pMMInfo->name);
+ STableMeta *pMeterMeta = taosCacheAcquireByName(tscCacheHandle, pMMInfo->name);
tscAddMeterMetaInfo(pNewQueryInfo, pMMInfo->name, pMeterMeta, NULL, pMMInfo->numOfTags, pMMInfo->tagColumnIndex);
}
@@ -3353,8 +3352,8 @@ int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) {
printf("create metric key:%s, index:%d\n", tagstr, i);
#endif
- taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);
- pMeterMetaInfo->pMetricMeta = (SSuperTableMeta *)taosGetDataFromCache(tscCacheHandle, tagstr);
+ taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);
+ pMeterMetaInfo->pMetricMeta = (SSuperTableMeta *) taosCacheAcquireByName(tscCacheHandle, tagstr);
}
}
diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c
index 3d55ff1c7267adc242ff037e2df8faa423076319..f14643a72fb2afc97a50239529ae0678c55cd208 100644
--- a/src/client/src/tscSub.c
+++ b/src/client/src/tscSub.c
@@ -371,7 +371,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
pSql->sqlstr = NULL;
taos_free_result_imp(pSql, 0);
pSql->sqlstr = sqlstr;
- taosClearDataCache(tscCacheHandle);
+ taosCacheEmpty(tscCacheHandle);
if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL;
tscTrace("meter synchronization completed");
} else {
diff --git a/src/client/src/tscSystem.c b/src/client/src/tscSystem.c
index d11f21be0fea6853b05cf1a24ed5727f1eaae74f..866398b7f5cfeadf1d5eda6d1354128ce8ed1c75 100644
--- a/src/client/src/tscSystem.c
+++ b/src/client/src/tscSystem.c
@@ -186,7 +186,7 @@ void taos_init_imp() {
refreshTime = refreshTime > 2 ? 2 : refreshTime;
refreshTime = refreshTime < 1 ? 1 : refreshTime;
- if (tscCacheHandle == NULL) tscCacheHandle = taosInitDataCache(tsMaxMeterConnections / 2, tscTmr, refreshTime);
+ if (tscCacheHandle == NULL) tscCacheHandle = taosCacheInit(tscTmr, refreshTime);
tscConnCache = taosOpenConnCache(tsMaxMeterConnections * 2, NULL/*taosCloseRpcConn*/, tscTmr, tsShellActivityTimer * 1000);
diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c
index 320aeee27e5f183b84184c56966780dde558054e..b655832f1159efe399cfeec94f8cc00cbb671f38 100644
--- a/src/client/src/tscUtil.c
+++ b/src/client/src/tscUtil.c
@@ -505,7 +505,7 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock) {
tfree(pDataBlock->params);
// free the refcount for metermeta
- taosRemoveDataFromCache(tscCacheHandle, (void**)&(pDataBlock->pMeterMeta), false);
+ taosCacheRelease(tscCacheHandle, (void**)&(pDataBlock->pMeterMeta), false);
tfree(pDataBlock);
}
@@ -589,9 +589,9 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
// set the correct metermeta object, the metermeta has been locked in pDataBlocks, so it must be in the cache
if (pMeterMetaInfo->pMeterMeta != pDataBlock->pMeterMeta) {
strcpy(pMeterMetaInfo->name, pDataBlock->tableId);
- taosRemoveDataFromCache(tscCacheHandle, (void**)&(pMeterMetaInfo->pMeterMeta), false);
+ taosCacheRelease(tscCacheHandle, (void**)&(pMeterMetaInfo->pMeterMeta), false);
- pMeterMetaInfo->pMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**)&pDataBlock->pMeterMeta);
+ pMeterMetaInfo->pMeterMeta = taosCacheTransfer(tscCacheHandle, (void**)&pDataBlock->pMeterMeta);
} else {
assert(strncmp(pMeterMetaInfo->name, pDataBlock->tableId, tListLen(pDataBlock->tableId)) == 0);
}
@@ -665,7 +665,7 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
* due to operation such as drop database. So here we add the reference count directly instead of invoke
* taosGetDataFromCache, which may return NULL value.
*/
- dataBuf->pMeterMeta = taosGetDataFromExists(tscCacheHandle, pMeterMeta);
+ dataBuf->pMeterMeta = taosCacheAcquireByData(tscCacheHandle, pMeterMeta);
assert(initialSize > 0 && pMeterMeta != NULL && dataBuf->pMeterMeta != NULL);
*dataBlocks = dataBuf;
@@ -1940,8 +1940,8 @@ void tscClearMeterMetaInfo(SMeterMetaInfo* pMeterMetaInfo, bool removeFromCache)
return;
}
- taosRemoveDataFromCache(tscCacheHandle, (void**)&(pMeterMetaInfo->pMeterMeta), removeFromCache);
- taosRemoveDataFromCache(tscCacheHandle, (void**)&(pMeterMetaInfo->pMetricMeta), removeFromCache);
+ taosCacheRelease(tscCacheHandle, (void**)&(pMeterMetaInfo->pMeterMeta), removeFromCache);
+ taosCacheRelease(tscCacheHandle, (void**)&(pMeterMetaInfo->pMetricMeta), removeFromCache);
}
void tscResetForNextRetrieve(SSqlRes* pRes) {
@@ -2071,16 +2071,16 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
SMeterMetaInfo* pFinalInfo = NULL;
if (pPrevSql == NULL) {
- STableMeta* pMeterMeta = taosGetDataFromCache(tscCacheHandle, name);
- SSuperTableMeta* pMetricMeta = taosGetDataFromCache(tscCacheHandle, key);
+ STableMeta* pMeterMeta = taosCacheAcquireByName(tscCacheHandle, name);
+ SSuperTableMeta* pMetricMeta = taosCacheAcquireByName(tscCacheHandle, key);
pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pMeterMeta, pMetricMeta, pMeterMetaInfo->numOfTags,
pMeterMetaInfo->tagColumnIndex);
} else { // transfer the ownership of pMeterMeta/pMetricMeta to the newly create sql object.
SMeterMetaInfo* pPrevInfo = tscGetMeterMetaInfo(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0);
- STableMeta* pPrevMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**)&pPrevInfo->pMeterMeta);
- SSuperTableMeta* pPrevMetricMeta = taosTransferDataInCache(tscCacheHandle, (void**)&pPrevInfo->pMetricMeta);
+ STableMeta* pPrevMeterMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pMeterMeta);
+ SSuperTableMeta* pPrevMetricMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pMetricMeta);
pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pPrevMeterMeta, pPrevMetricMeta, pMeterMetaInfo->numOfTags,
pMeterMetaInfo->tagColumnIndex);
diff --git a/src/query/inc/tcache.h b/src/query/inc/tcache.h
index b577c53ea8dbcdc9f069288b94d0244907e77f12..6f6ef17d8d34cc851111ec760ad4e19ff2788b87 100644
--- a/src/query/inc/tcache.h
+++ b/src/query/inc/tcache.h
@@ -20,7 +20,63 @@
extern "C" {
#endif
-#include
+#include "os.h"
+#include "tref.h"
+#include "hash.h"
+
+typedef struct SCacheStatis {
+ int64_t missCount;
+ int64_t hitCount;
+ int64_t totalAccess;
+ int64_t refreshCount;
+ int32_t numOfCollision;
+} SCacheStatis;
+
+typedef struct SCacheDataNode {
+ uint64_t addedTime; // the added time when this element is added or updated into cache
+ uint64_t expiredTime; // expiredTime expiredTime when this element should be remove from cache
+ uint64_t signature;
+ uint32_t size; // allocated size for current SCacheDataNode
+ uint16_t keySize : 15;
+ bool inTrash : 1; // denote if it is in trash or not
+ T_REF_DECLARE()
+ char *key;
+ char data[];
+} SCacheDataNode;
+
+typedef struct STrashElem {
+ struct STrashElem *prev;
+ struct STrashElem *next;
+ SCacheDataNode * pData;
+} STrashElem;
+
+typedef struct {
+ int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included.
+ int64_t refreshTime;
+
+ /*
+ * to accommodate the old datanode which has the same key value of new one in hashList
+ * when an new node is put into cache, if an existed one with the same key:
+ * 1. if the old one does not be referenced, update it.
+ * 2. otherwise, move the old one to pTrash, addedTime the new one.
+ *
+ * when the node in pTrash does not be referenced, it will be release at the expired expiredTime
+ */
+ STrashElem * pTrash;
+ void * tmrCtrl;
+ void * pTimer;
+ SCacheStatis statistics;
+ SHashObj * pHashTable;
+ int numOfElemsInTrash; // number of element in trash
+ int16_t deleting; // set the deleting flag to stop refreshing ASAP.
+
+#if defined(LINUX)
+ pthread_rwlock_t lock;
+#else
+ pthread_mutex_t lock;
+#endif
+
+} SCacheObj;
/**
*
@@ -30,7 +86,7 @@ extern "C" {
* not referenced by other objects
* @return
*/
-void *taosInitDataCache(int maxSessions, void *tmrCtrl, int64_t refreshTimeInSeconds);
+SCacheObj *taosCacheInit(void *tmrCtrl, int64_t refreshTimeInSeconds);
/**
* add data into cache
@@ -42,28 +98,7 @@ void *taosInitDataCache(int maxSessions, void *tmrCtrl, int64_t refreshTimeInSec
* @param keepTime survival time in second
* @return cached element
*/
-void *taosAddDataIntoCache(void *handle, char *key, char *pData, int dataSize, int keepTimeInSeconds);
-
-/**
- * remove data in cache, the data will not be removed immediately.
- * if it is referenced by other object, it will be remain in cache
- * @param handle cache object
- * @param data not the key, actually referenced data
- * @param _remove force model, reduce the ref count and move the data into
- * pTrash
- */
-void taosRemoveDataFromCache(void *handle, void **data, bool _remove);
-
-/**
- * update data in cache
- * @param handle hash object handle(pointer)
- * @param key key for hash
- * @param pData actually data
- * @param size length of data
- * @param duration survival time of this object in cache
- * @return new referenced data
- */
-void *taosUpdateDataFromCache(void *handle, char *key, char *pData, int size, int duration);
+void *taosCachePut(void *handle, char *key, char *pData, int dataSize, int keepTimeInSeconds);
/**
* get data from cache
@@ -71,40 +106,56 @@ void *taosUpdateDataFromCache(void *handle, char *key, char *pData, int size, in
* @param key key
* @return cached data or NULL
*/
-void *taosGetDataFromCache(void *handle, char *key);
+void *taosCacheAcquireByName(void *handle, char *key);
/**
- * release all allocated memory and destroy the cache object
+ * Add one reference count for the exist data, and assign this data for a new owner.
+ * The new owner needs to invoke the taosCacheRelease when it does not need this data anymore.
+ * This procedure is a faster version of taosCacheAcquireByName function, which avoids the sideeffect of the problem of
+ * the data is moved to trash, and taosCacheAcquireByName will fail to retrieve it again.
*
* @param handle
+ * @param data
+ * @return
*/
-void taosCleanUpDataCache(void *handle);
+void *taosCacheAcquireByData(void *handle, void *data);
/**
- * move all data node into trash,clear node in trash can if it is not referenced by client
+ * transfer the ownership of data in cache to another object without increasing reference count.
* @param handle
+ * @param data
+ * @return
*/
-void taosClearDataCache(void *handle);
+void *taosCacheTransfer(void *handle, void **data);
/**
- * Add one reference count for the exist data, and assign this data for a new owner.
- * The new owner needs to invoke the taosRemoveDataFromCache when it does not need this data anymore.
- * This procedure is a faster version of taosGetDataFromCache function, which avoids the sideeffect of the problem of the
- * data is moved to trash, and taosGetDataFromCache will fail to retrieve it again.
- *
+ * remove data in cache, the data will not be removed immediately.
+ * if it is referenced by other object, it will be remain in cache
+ * @param handle cache object
+ * @param data not the key, actually referenced data
+ * @param _remove force model, reduce the ref count and move the data into
+ * pTrash
+ */
+void taosCacheRelease(void *handle, void **data, bool _remove);
+
+/**
+ * move all data node into trash, clear node in trash can if it is not referenced by any clients
* @param handle
- * @param data
- * @return
*/
-void* taosGetDataFromExists(void* handle, void* data);
+void taosCacheEmpty(SCacheObj *pCacheObj);
/**
- * transfer the ownership of data in cache to another object without increasing reference count.
+ * release all allocated memory and destroy the cache object.
+ *
+ * This function only set the deleting flag, and the specific work of clean up cache is delegated to
+ * taosCacheRefresh function, which will executed every SCacheObj->refreshTime sec.
+ *
+ * If the value of SCacheObj->refreshTime is too large, the taosCacheRefresh function may not be invoked
+ * before the main thread terminated, in which case all allocated resources are simply recycled by OS.
+ *
* @param handle
- * @param data
- * @return
*/
-void* taosTransferDataInCache(void* handle, void** data);
+void taosCacheCleanup(SCacheObj *pCacheObj);
#ifdef __cplusplus
}
diff --git a/src/util/inc/hash.h b/src/util/inc/hash.h
index 8aa4da85f7bc41513312b0ce50715bee636073b4..1bbc8dcf5c229112b421de9a4b5ef4f056a39438 100644
--- a/src/util/inc/hash.h
+++ b/src/util/inc/hash.h
@@ -23,17 +23,18 @@ extern "C" {
#include "hashfunc.h"
#define HASH_MAX_CAPACITY (1024 * 1024 * 16)
-#define HASH_VALUE_IN_TRASH (-1)
#define HASH_DEFAULT_LOAD_FACTOR (0.75)
#define HASH_INDEX(v, c) ((v) & ((c)-1))
+typedef void (*_hash_free_fn_t)(void *param);
+
typedef struct SHashNode {
- char *key; // null-terminated string
+ char *key;
union {
struct SHashNode * prev;
struct SHashEntry *prev1;
};
-
+
struct SHashNode *next;
uint32_t hashVal; // the hash value of key, if hashVal == HASH_VALUE_IN_TRASH, this node is moved to trash
uint32_t keyLen; // length of the key
@@ -46,18 +47,27 @@ typedef struct SHashEntry {
} SHashEntry;
typedef struct SHashObj {
- SHashEntry **hashList;
- size_t capacity; // number of slots
- size_t size; // number of elements in hash table
- _hash_fn_t hashFp; // hash function
-
-#if defined (LINUX)
- pthread_rwlock_t* lock;
+ SHashEntry ** hashList;
+ size_t capacity; // number of slots
+ size_t size; // number of elements in hash table
+ _hash_fn_t hashFp; // hash function
+ _hash_free_fn_t freeFp; // hash node free callback function
+
+#if defined(LINUX)
+ pthread_rwlock_t *lock;
#else
- pthread_mutex_t* lock;
+ pthread_mutex_t *lock;
#endif
} SHashObj;
+typedef struct SHashMutableIterator {
+ SHashObj * pHashObj;
+ int32_t entryIndex;
+ SHashNode *pCur;
+ SHashNode *pNext; // current node can be deleted for mutable iterator, so keep the next one before return current
+ int32_t num; // already check number of elements in hash table
+} SHashMutableIterator;
+
/**
* init the hash table
*
@@ -102,7 +112,7 @@ void *taosHashGet(SHashObj *pHashObj, const char *key, size_t keyLen);
* @param key
* @param keyLen
*/
-void taosHashRemove(SHashObj *pHashObj, const char *key, size_t keyLen);
+void taosHashRemove(SHashObj *pHashObj, const char *key, size_t keyLen);
/**
* clean up hash table
@@ -110,6 +120,41 @@ void taosHashRemove(SHashObj *pHashObj, const char *key, size_t keyLen);
*/
void taosHashCleanup(SHashObj *pHashObj);
+/**
+ * Set the free callback function
+ * This function if set will be invoked right before freeing each hash node
+ * @param pHashObj
+ */
+void taosHashSetFreecb(SHashObj *pHashObj, _hash_free_fn_t freeFp);
+
+/**
+ *
+ * @param pHashObj
+ * @return
+ */
+SHashMutableIterator* taosHashCreateIter(SHashObj *pHashObj);
+
+/**
+ *
+ * @param iter
+ * @return
+ */
+bool taosHashIterNext(SHashMutableIterator *iter);
+
+/**
+ *
+ * @param iter
+ * @return
+ */
+void *taosHashIterGet(SHashMutableIterator *iter);
+
+/**
+ *
+ * @param iter
+ * @return
+ */
+void* taosHashDestroyIter(SHashMutableIterator* iter);
+
/**
*
* @param pHashObj
diff --git a/src/util/inc/tref.h b/src/util/inc/tref.h
new file mode 100644
index 0000000000000000000000000000000000000000..9483c1cc35e6d01c7b49c993d34f0c03ad950fdd
--- /dev/null
+++ b/src/util/inc/tref.h
@@ -0,0 +1,58 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#ifndef TDENGINE_TREF_H
+#define TDENGINE_TREF_H
+
+#include "os.h"
+
+typedef void (*_ref_fn_t)(const void* pObj);
+
+#define T_REF_DECLARE() \
+ struct { \
+ int16_t val; \
+ } _ref;
+
+#define T_REF_REGISTER_FUNC(s, e) \
+ struct { \
+ _ref_fn_t start; \
+ _ref_fn_t end; \
+ } _ref_func = {.begin = (s), .end = (e)};
+
+#define T_REF_INC(x) (atomic_add_fetch_16(&((x)->_ref.val), 1));
+
+#define T_REF_INC_WITH_CB(x, p) \
+ do { \
+ int32_t v = atomic_add_fetch_32(&((x)->_ref.val), 1); \
+ if (v == 1 && (p)->_ref_func.begin != NULL) { \
+ (p)->_ref_func.begin((x)); \
+ } \
+ } while (0)
+
+#define T_REF_DEC(x) (atomic_sub_fetch_16(&((x)->_ref.val), 1));
+
+#define T_REF_DEC_WITH_CB(x, p) \
+ do { \
+ int32_t v = atomic_sub_fetch_16(&((x)->_ref.val), 1); \
+ if (v == 0 && (p)->_ref_func.end != NULL) { \
+ (p)->_ref_func.end((x)); \
+ } \
+ } while (0)
+
+#define T_REF_VAL_CHECK(x) assert((x)->_ref.val >= 0);
+
+#define T_REF_VAL_GET(x) (x)->_ref.val
+
+#endif // TDENGINE_TREF_H
diff --git a/src/util/src/hash.c b/src/util/src/hash.c
index c69fbb772314d6a3569ee38ac043eac7fa413229..28af28e507c167d1ef2087ae3fae0e6dd9511607 100644
--- a/src/util/src/hash.c
+++ b/src/util/src/hash.c
@@ -24,8 +24,8 @@ static FORCE_INLINE void __wr_lock(void *lock) {
if (lock == NULL) {
return;
}
-
-#if defined (LINUX)
+
+#if defined(LINUX)
pthread_rwlock_wrlock(lock);
#else
pthread_mutex_lock(lock);
@@ -36,8 +36,8 @@ static FORCE_INLINE void __rd_lock(void *lock) {
if (lock == NULL) {
return;
}
-
-#if defined (LINUX)
+
+#if defined(LINUX)
pthread_rwlock_rdlock(lock);
#else
pthread_mutex_lock(lock);
@@ -48,8 +48,8 @@ static FORCE_INLINE void __unlock(void *lock) {
if (lock == NULL) {
return;
}
-
-#if defined (LINUX)
+
+#if defined(LINUX)
pthread_rwlock_unlock(lock);
#else
pthread_mutex_unlock(lock);
@@ -60,8 +60,8 @@ static FORCE_INLINE int32_t __lock_init(void *lock) {
if (lock == NULL) {
return 0;
}
-
-#if defined (LINUX)
+
+#if defined(LINUX)
return pthread_rwlock_init(lock, NULL);
#else
return pthread_mutex_init(lock, NULL);
@@ -72,8 +72,8 @@ static FORCE_INLINE void __lock_destroy(void *lock) {
if (lock == NULL) {
return;
}
-
-#if defined (LINUX)
+
+#if defined(LINUX)
pthread_rwlock_destroy(lock);
#else
pthread_mutex_destroy(lock);
@@ -107,7 +107,7 @@ static void doUpdateHashTable(SHashObj *pHashObj, SHashNode *pNode) {
/**
* get SHashNode from hashlist, nodes from trash are not included.
- * @param pHashObj Cache objection
+ * @param pHashObj Cache objection
* @param key key for hash
* @param keyLen key length
* @return
@@ -155,24 +155,24 @@ static void taosHashTableResize(SHashObj *pHashObj) {
int32_t newSize = pHashObj->capacity << 1U;
if (newSize > HASH_MAX_CAPACITY) {
- pTrace("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached", pHashObj->capacity,
- HASH_MAX_CAPACITY);
+ pTrace("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached",
+ pHashObj->capacity, HASH_MAX_CAPACITY);
return;
}
int64_t st = taosGetTimestampUs();
- SHashEntry **pNewEntry = realloc(pHashObj->hashList, sizeof(SHashEntry*) * newSize);
+ SHashEntry **pNewEntry = realloc(pHashObj->hashList, sizeof(SHashEntry *) * newSize);
if (pNewEntry == NULL) {
pTrace("cache resize failed due to out of memory, capacity remain:%d", pHashObj->capacity);
return;
}
pHashObj->hashList = pNewEntry;
- for(int32_t i = pHashObj->capacity; i < newSize; ++i) {
+ for (int32_t i = pHashObj->capacity; i < newSize; ++i) {
pHashObj->hashList[i] = calloc(1, sizeof(SHashEntry));
}
-
+
pHashObj->capacity = newSize;
for (int32_t i = 0; i < pHashObj->capacity; ++i) {
@@ -182,7 +182,7 @@ static void taosHashTableResize(SHashObj *pHashObj) {
if (pNode != NULL) {
assert(pNode->prev1 == pEntry && pEntry->num > 0);
}
-
+
while (pNode) {
int32_t j = HASH_INDEX(pNode->hashVal, pHashObj->capacity);
if (j == i) { // this key resides in the same slot, no need to relocate it
@@ -192,13 +192,13 @@ static void taosHashTableResize(SHashObj *pHashObj) {
// remove from current slot
assert(pNode->prev1 != NULL);
-
- if (pNode->prev1 == pEntry) { // first node of the overflow linked list
+
+ if (pNode->prev1 == pEntry) { // first node of the overflow linked list
pEntry->next = pNode->next;
} else {
pNode->prev->next = pNode->next;
}
-
+
pEntry->num--;
assert(pEntry->num >= 0);
@@ -214,13 +214,13 @@ static void taosHashTableResize(SHashObj *pHashObj) {
if (pNewIndexEntry->next != NULL) {
assert(pNewIndexEntry->next->prev1 == pNewIndexEntry);
-
+
pNewIndexEntry->next->prev = pNode;
}
-
+
pNode->next = pNewIndexEntry->next;
pNode->prev1 = pNewIndexEntry;
-
+
pNewIndexEntry->next = pNode;
pNewIndexEntry->num++;
@@ -258,14 +258,14 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool threadsafe) {
pHashObj->hashFp = fn;
- pHashObj->hashList = (SHashEntry **)calloc(pHashObj->capacity, sizeof(SHashEntry*));
+ pHashObj->hashList = (SHashEntry **)calloc(pHashObj->capacity, sizeof(SHashEntry *));
if (pHashObj->hashList == NULL) {
free(pHashObj);
pError("failed to allocate memory, reason:%s", strerror(errno));
return NULL;
}
-
- for(int32_t i = 0; i < pHashObj->capacity; ++i) {
+
+ for (int32_t i = 0; i < pHashObj->capacity; ++i) {
pHashObj->hashList[i] = calloc(1, sizeof(SHashEntry));
}
@@ -276,7 +276,7 @@ SHashObj *taosHashInit(size_t capacity, _hash_fn_t fn, bool threadsafe) {
pHashObj->lock = calloc(1, sizeof(pthread_mutex_t));
#endif
}
-
+
if (__lock_init(pHashObj->lock) != 0) {
free(pHashObj->hashList);
free(pHashObj);
@@ -347,7 +347,7 @@ static void doAddToHashTable(SHashObj *pHashObj, SHashNode *pNode) {
int32_t index = HASH_INDEX(pNode->hashVal, pHashObj->capacity);
SHashEntry *pEntry = pHashObj->hashList[index];
-
+
pNode->next = pEntry->next;
if (pEntry->next) {
@@ -356,7 +356,7 @@ static void doAddToHashTable(SHashObj *pHashObj, SHashNode *pNode) {
pEntry->next = pNode;
pNode->prev1 = pEntry;
-
+
pEntry->num++;
pHashObj->size++;
}
@@ -365,7 +365,7 @@ size_t taosHashGetSize(const SHashObj *pHashObj) {
if (pHashObj == NULL) {
return 0;
}
-
+
return pHashObj->size;
}
@@ -430,7 +430,7 @@ void *taosHashGet(SHashObj *pHashObj, const char *key, size_t keyLen) {
void taosHashRemove(SHashObj *pHashObj, const char *key, size_t keyLen) {
__wr_lock(pHashObj->lock);
- uint32_t val = 0;
+ uint32_t val = 0;
SHashNode *pNode = doGetNodeFromHashTable(pHashObj, key, keyLen, &val);
if (pNode == NULL) {
__unlock(pHashObj->lock);
@@ -446,13 +446,13 @@ void taosHashRemove(SHashObj *pHashObj, const char *key, size_t keyLen) {
pNode->prev->next = pNext;
}
}
-
+
if (pNext != NULL) {
pNext->prev = pNode->prev;
}
uint32_t index = HASH_INDEX(pNode->hashVal, pHashObj->capacity);
-
+
SHashEntry *pEntry = pHashObj->hashList[index];
pEntry->num--;
@@ -483,10 +483,14 @@ void taosHashCleanup(SHashObj *pHashObj) {
while (pNode) {
pNext = pNode->next;
+ if (pHashObj->freeFp) {
+ pHashObj->freeFp(pNode->data);
+ }
+
free(pNode);
pNode = pNext;
}
-
+
tfree(pEntry);
}
@@ -496,24 +500,122 @@ void taosHashCleanup(SHashObj *pHashObj) {
__unlock(pHashObj->lock);
__lock_destroy(pHashObj->lock);
+ tfree(pHashObj->lock);
memset(pHashObj, 0, sizeof(SHashObj));
free(pHashObj);
}
+void taosHashSetFreecb(SHashObj *pHashObj, _hash_free_fn_t freeFp) {
+ if (pHashObj == NULL || freeFp == NULL) {
+ return;
+ }
+
+ pHashObj->freeFp = freeFp;
+}
+
+SHashMutableIterator *taosHashCreateIter(SHashObj *pHashObj) {
+ SHashMutableIterator *pIter = calloc(1, sizeof(SHashMutableIterator));
+ if (pIter == NULL) {
+ return NULL;
+ }
+
+ pIter->pHashObj = pHashObj;
+}
+
+static SHashNode *getNextHashNode(SHashMutableIterator *pIter) {
+ assert(pIter != NULL);
+
+ while (pIter->entryIndex < pIter->pHashObj->capacity) {
+ SHashEntry *pEntry = pIter->pHashObj->hashList[pIter->entryIndex];
+ if (pEntry->next == NULL) {
+ pIter->entryIndex++;
+ continue;
+ }
+
+ return pEntry->next;
+ }
+
+ return NULL;
+}
+
+bool taosHashIterNext(SHashMutableIterator *pIter) {
+ if (pIter == NULL) {
+ return false;
+ }
+
+ size_t size = taosHashGetSize(pIter->pHashObj);
+ if (size == 0 || pIter->num >= size) {
+ return false;
+ }
+
+ // check the first one
+ if (pIter->num == 0) {
+ assert(pIter->pCur == NULL && pIter->pNext == NULL);
+
+ while (1) {
+ SHashEntry *pEntry = pIter->pHashObj->hashList[pIter->entryIndex];
+ if (pEntry->next == NULL) {
+ pIter->entryIndex++;
+ continue;
+ }
+
+ pIter->pCur = pEntry->next;
+
+ if (pIter->pCur->next) {
+ pIter->pNext = pIter->pCur->next;
+ } else {
+ pIter->pNext = getNextHashNode(pIter);
+ }
+
+ break;
+ }
+
+ pIter->num++;
+ return true;
+ } else {
+ assert(pIter->pCur != NULL);
+ if (pIter->pNext) {
+ pIter->pCur = pIter->pNext;
+ } else { // no more data in the hash list
+ return false;
+ }
+
+ pIter->num++;
+
+ if (pIter->pCur->next) {
+ pIter->pNext = pIter->pCur->next;
+ } else {
+ pIter->pNext = getNextHashNode(pIter);
+ }
+
+ return true;
+ }
+}
+
+void *taosHashIterGet(SHashMutableIterator *iter) { return (iter == NULL) ? NULL : iter->pCur->data; }
+
+void *taosHashDestroyIter(SHashMutableIterator *iter) {
+ if (iter == NULL) {
+ return NULL;
+ }
+
+ free(iter);
+}
+
// for profile only
-int32_t taosHashGetMaxOverflowLinkLength(const SHashObj* pHashObj) {
+int32_t taosHashGetMaxOverflowLinkLength(const SHashObj *pHashObj) {
if (pHashObj == NULL || pHashObj->size == 0) {
return 0;
}
-
+
int32_t num = 0;
-
- for(int32_t i = 0; i < pHashObj->size; ++i) {
+
+ for (int32_t i = 0; i < pHashObj->size; ++i) {
SHashEntry *pEntry = pHashObj->hashList[i];
if (num < pEntry->num) {
num = pEntry->num;
}
}
-
+
return num;
}