提交 8504daea 编写于 作者: H hjxilinx

refactor cache module based on new hash table and reference count management module

上级 98b4296b
......@@ -13,86 +13,17 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "hashfunc.h"
#include "tcache.h"
#include "hash.h"
#include "hashfunc.h"
#include "tlog.h"
#include "ttime.h"
#include "ttimer.h"
#include "tutil.h"
#define HASH_MAX_CAPACITY (1024*1024*16)
#define HASH_VALUE_IN_TRASH (-1)
#define HASH_DEFAULT_LOAD_FACTOR (0.75)
#define HASH_INDEX(v, c) ((v) & ((c)-1))
/**
* todo: refactor to extract the hash table out of cache structure
*/
typedef struct SCacheStatis {
int64_t missCount;
int64_t hitCount;
int64_t totalAccess;
int64_t refreshCount;
int32_t numOfCollision;
int32_t numOfResize;
int64_t resizeTime;
} SCacheStatis;
typedef struct _cache_node_t {
char * key; // null-terminated string
struct _cache_node_t *prev;
struct _cache_node_t *next;
uint64_t addTime; // the time when this element is added or updated into cache
uint64_t time; // end time when this element should be remove from cache
uint64_t signature;
/*
* reference count for this object
* if this value is larger than 0, this value will never be released
*/
uint32_t refCount;
uint32_t hashVal; // the hash value of key, if hashVal == HASH_VALUE_IN_TRASH, this node is moved to trash
uint32_t nodeSize; // allocated size for current SDataNode
char data[];
} SDataNode;
typedef uint32_t (*_hashFunc)(const char *, uint32_t);
typedef struct {
SDataNode **hashList;
int capacity;
int size;
int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included.
int64_t refreshTime;
/*
* to accommodate the old datanode which has the same key value of new one in hashList
* when an new node is put into cache, if an existed one with the same key:
* 1. if the old one does not be referenced, update it.
* 2. otherwise, move the old one to pTrash, add the new one.
*
* when the node in pTrash does not be referenced, it will be release at the expired time
*/
SDataNode * pTrash;
void * tmrCtrl;
void * pTimer;
SCacheStatis statistics;
_hashFunc hashFp;
int numOfElemsInTrash; // number of element in trash
int16_t deleting; // set the deleting flag to stop refreshing asap.
#if defined LINUX
pthread_rwlock_t lock;
#else
pthread_mutex_t lock;
#endif
} SCacheObj;
static FORCE_INLINE void __cache_wr_lock(SCacheObj *pObj) {
#if defined LINUX
#if defined(LINUX)
pthread_rwlock_wrlock(&pObj->lock);
#else
pthread_mutex_lock(&pObj->lock);
......@@ -100,7 +31,7 @@ static FORCE_INLINE void __cache_wr_lock(SCacheObj *pObj) {
}
static FORCE_INLINE void __cache_rd_lock(SCacheObj *pObj) {
#if defined LINUX
#if defined(LINUX)
pthread_rwlock_rdlock(&pObj->lock);
#else
pthread_mutex_lock(&pObj->lock);
......@@ -108,7 +39,7 @@ static FORCE_INLINE void __cache_rd_lock(SCacheObj *pObj) {
}
static FORCE_INLINE void __cache_unlock(SCacheObj *pObj) {
#if defined LINUX
#if defined(LINUX)
pthread_rwlock_unlock(&pObj->lock);
#else
pthread_mutex_unlock(&pObj->lock);
......@@ -116,7 +47,7 @@ static FORCE_INLINE void __cache_unlock(SCacheObj *pObj) {
}
static FORCE_INLINE int32_t __cache_lock_init(SCacheObj *pObj) {
#if defined LINUX
#if defined(LINUX)
return pthread_rwlock_init(&pObj->lock, NULL);
#else
return pthread_mutex_init(&pObj->lock, NULL);
......@@ -124,19 +55,16 @@ static FORCE_INLINE int32_t __cache_lock_init(SCacheObj *pObj) {
}
static FORCE_INLINE void __cache_lock_destroy(SCacheObj *pObj) {
#if defined LINUX
#if defined(LINUX)
pthread_rwlock_destroy(&pObj->lock);
#else
pthread_mutex_destroy(&pObj->lock);
#endif
}
static FORCE_INLINE int32_t taosHashTableLength(int32_t length) {
int32_t trueLength = MIN(length, HASH_MAX_CAPACITY);
int32_t i = 4;
while (i < trueLength) i = (i << 1);
return i;
static FORCE_INLINE void taosFreeNode(void *data) {
SCacheDataNode *pNode = *(SCacheDataNode **)data;
free(pNode);
}
/**
......@@ -145,86 +73,83 @@ static FORCE_INLINE int32_t taosHashTableLength(int32_t length) {
* @param pData actually data. required a consecutive memory block, no pointer is allowed
* in pData. Pointer copy causes memory access error.
* @param size size of block
* @param lifespan total survial time from now
* @return SDataNode
* @param lifespan total survial expiredTime from now
* @return SCacheDataNode
*/
static SDataNode *taosCreateHashNode(const char *key, uint32_t keyLen, const char *pData, size_t dataSize,
uint64_t lifespan) {
size_t totalSize = dataSize + sizeof(SDataNode) + keyLen;
static SCacheDataNode *taosCreateHashNode(const char *key, size_t keyLen, const char *pData, size_t size,
uint64_t duration) {
size_t totalSize = size + sizeof(SCacheDataNode) + keyLen;
SDataNode *pNewNode = calloc(1, totalSize);
SCacheDataNode *pNewNode = calloc(1, totalSize);
if (pNewNode == NULL) {
pError("failed to allocate memory, reason:%s", strerror(errno));
return NULL;
}
memcpy(pNewNode->data, pData, dataSize);
pNewNode->addTime = (uint64_t)taosGetTimestampMs();
pNewNode->time = pNewNode->addTime + lifespan;
memcpy(pNewNode->data, pData, size);
pNewNode->key = (char *)pNewNode + sizeof(SCacheDataNode) + size;
pNewNode->keySize = keyLen;
memcpy(pNewNode->key, key, keyLen);
pNewNode->key = pNewNode->data + dataSize;
strcpy(pNewNode->key, key);
pNewNode->addedTime = (uint64_t)taosGetTimestampMs();
pNewNode->expiredTime = pNewNode->addedTime + duration;
pNewNode->signature = (uint64_t)pNewNode;
pNewNode->nodeSize = (uint32_t)totalSize;
pNewNode->size = (uint32_t)totalSize;
return pNewNode;
}
/**
* hash key function
*
* @param key key string
* @param len length of key
* @return hash value
*/
static FORCE_INLINE uint32_t taosHashKey(const char *key, uint32_t len) { return MurmurHash3_32(key, len); }
/**
* add object node into trash, and this object is closed for referencing if it is add to trash
* addedTime object node into trash, and this object is closed for referencing if it is addedTime to trash
* It will be removed until the pNode->refCount == 0
* @param pObj Cache object
* @param pNode Cache slot object
*/
static void taosAddToTrash(SCacheObj *pObj, SDataNode *pNode) {
if (pNode->hashVal == HASH_VALUE_IN_TRASH) { /* node is already in trash */
static void taosAddToTrash(SCacheObj *pObj, SCacheDataNode *pNode) {
if (pNode->inTrash) { /* node is already in trash */
return;
}
pNode->next = pObj->pTrash;
STrashElem *pElem = calloc(1, sizeof(STrashElem));
pElem->pData = pNode;
pElem->next = pObj->pTrash;
if (pObj->pTrash) {
pObj->pTrash->prev = pNode;
pObj->pTrash->prev = pElem;
}
pNode->prev = NULL;
pObj->pTrash = pNode;
pElem->prev = NULL;
pObj->pTrash = pElem;
pNode->hashVal = HASH_VALUE_IN_TRASH;
pNode->inTrash = true;
pObj->numOfElemsInTrash++;
pTrace("key:%s %p move to trash, numOfElem in trash:%d", pNode->key, pNode, pObj->numOfElemsInTrash);
}
static void taosRemoveFromTrash(SCacheObj *pObj, SDataNode *pNode) {
if (pNode->signature != (uint64_t)pNode) {
pError("key:sig:%d %p data has been released, ignore", pNode->signature, pNode);
static void taosRemoveFromTrash(SCacheObj *pObj, STrashElem *pElem) {
if (pElem->pData->signature != (uint64_t)pElem->pData) {
pError("key:sig:%d %p data has been released, ignore", pElem->pData->signature, pElem->pData);
return;
}
pObj->numOfElemsInTrash--;
if (pNode->prev) {
pNode->prev->next = pNode->next;
} else {
/* pnode is the header, update header */
pObj->pTrash = pNode->next;
if (pElem->prev) {
pElem->prev->next = pElem->next;
} else { /* pnode is the header, update header */
pObj->pTrash = pElem->next;
}
if (pNode->next) {
pNode->next->prev = pNode->prev;
if (pElem->next) {
pElem->next->prev = pElem->prev;
}
pNode->signature = 0;
free(pNode);
pElem->pData->signature = 0;
free(pElem->pData);
free(pElem);
}
/**
* remove nodes in trash with refCount == 0 in cache
......@@ -233,7 +158,7 @@ static void taosRemoveFromTrash(SCacheObj *pObj, SDataNode *pNode) {
* @param force force model, if true, remove data in trash without check refcount.
* may cause corruption. So, forece model only applys before cache is closed
*/
static void taosClearCacheTrash(SCacheObj *pObj, bool force) {
static void taosTrashEmpty(SCacheObj *pObj, bool force) {
__cache_wr_lock(pObj);
if (pObj->numOfElemsInTrash == 0) {
......@@ -246,25 +171,23 @@ static void taosClearCacheTrash(SCacheObj *pObj, bool force) {
return;
}
SDataNode *pNode = pObj->pTrash;
STrashElem *pElem = pObj->pTrash;
while (pNode) {
if (pNode->refCount < 0) {
pError("key:%s %p in trash released more than referenced, removed", pNode->key, pNode);
pNode->refCount = 0;
while (pElem) {
T_REF_VAL_CHECK(pElem->pData);
if (pElem->next == pElem) {
pElem->next = NULL;
}
if (pNode->next == pNode) {
pNode->next = NULL;
}
if (force || (T_REF_VAL_GET(pElem->pData) == 0)) {
pTrace("key:%s %p removed from trash. numOfElem in trash:%d", pElem->pData->key, pElem->pData,
pObj->numOfElemsInTrash - 1);
STrashElem *p = pElem;
if (force || (pNode->refCount == 0)) {
pTrace("key:%s %p removed from trash. numOfElem in trash:%d", pNode->key, pNode, pObj->numOfElemsInTrash - 1)
SDataNode *pTmp = pNode;
pNode = pNode->next;
taosRemoveFromTrash(pObj, pTmp);
pElem = pElem->next;
taosRemoveFromTrash(pObj, p);
} else {
pNode = pNode->next;
pElem = pElem->next;
}
}
......@@ -272,197 +195,20 @@ static void taosClearCacheTrash(SCacheObj *pObj, bool force) {
__cache_unlock(pObj);
}
/**
* add data node into cache
* @param pObj cache object
* @param pNode Cache slot object
*/
static void taosAddNodeToHashTable(SCacheObj *pObj, SDataNode *pNode) {
int32_t slotIndex = HASH_INDEX(pNode->hashVal, pObj->capacity);
pNode->next = pObj->hashList[slotIndex];
if (pObj->hashList[slotIndex] != NULL) {
(pObj->hashList[slotIndex])->prev = pNode;
pObj->statistics.numOfCollision++;
}
pObj->hashList[slotIndex] = pNode;
pObj->size++;
pObj->totalSize += pNode->nodeSize;
pTrace("key:%s %p add to hash table", pNode->key, pNode);
}
/**
* remove node in hash list
* @param pObj
* @param pNode
*/
static void taosRemoveNodeInHashTable(SCacheObj *pObj, SDataNode *pNode) {
if (pNode->hashVal == HASH_VALUE_IN_TRASH) return;
SDataNode *pNext = pNode->next;
if (pNode->prev != NULL) {
pNode->prev->next = pNext;
} else { /* the node is in hashlist, remove it */
pObj->hashList[HASH_INDEX(pNode->hashVal, pObj->capacity)] = pNext;
}
if (pNext != NULL) {
pNext->prev = pNode->prev;
}
pObj->size--;
pObj->totalSize -= pNode->nodeSize;
pNode->next = NULL;
pNode->prev = NULL;
pTrace("key:%s %p remove from hashtable", pNode->key, pNode);
}
/**
* in-place node in hashlist
* @param pObj cache object
* @param pNode data node
*/
static void taosUpdateInHashTable(SCacheObj *pObj, SDataNode *pNode) {
assert(pNode->hashVal >= 0);
if (pNode->prev) {
pNode->prev->next = pNode;
} else {
pObj->hashList[HASH_INDEX(pNode->hashVal, pObj->capacity)] = pNode;
}
if (pNode->next) {
(pNode->next)->prev = pNode;
}
pTrace("key:%s %p update hashtable", pNode->key, pNode);
}
/**
* get SDataNode from hashlist, nodes from trash are not included.
* @param pObj Cache objection
* @param key key for hash
* @param keyLen key length
* @return
*/
static SDataNode *taosGetNodeFromHashTable(SCacheObj *pObj, const char *key, uint32_t keyLen) {
uint32_t hash = (*pObj->hashFp)(key, keyLen);
int32_t slot = HASH_INDEX(hash, pObj->capacity);
SDataNode *pNode = pObj->hashList[slot];
while (pNode) {
if (strcmp(pNode->key, key) == 0) break;
pNode = pNode->next;
}
if (pNode) {
assert(HASH_INDEX(pNode->hashVal, pObj->capacity) == slot);
}
return pNode;
}
/**
* resize the hash list if the threshold is reached
*
* @param pObj
*/
static void taosHashTableResize(SCacheObj *pObj) {
if (pObj->size < pObj->capacity * HASH_DEFAULT_LOAD_FACTOR) {
return;
}
// double the original capacity
pObj->statistics.numOfResize++;
SDataNode *pNode = NULL;
SDataNode *pNext = NULL;
int32_t newSize = pObj->capacity << 1;
if (newSize > HASH_MAX_CAPACITY) {
pTrace("current capacity:%d, maximum capacity:%d, no resize applied due to limitation is reached",
pObj->capacity, HASH_MAX_CAPACITY);
return;
}
int64_t st = taosGetTimestampUs();
SDataNode **pList = realloc(pObj->hashList, sizeof(SDataNode *) * newSize);
if (pList == NULL) {
pTrace("cache resize failed due to out of memory, capacity remain:%d", pObj->capacity);
return;
}
pObj->hashList = pList;
int32_t inc = newSize - pObj->capacity;
memset(&pObj->hashList[pObj->capacity], 0, inc * sizeof(SDataNode *));
pObj->capacity = newSize;
for (int32_t i = 0; i < pObj->capacity; ++i) {
pNode = pObj->hashList[i];
while (pNode) {
int32_t j = HASH_INDEX(pNode->hashVal, pObj->capacity);
if (j == i) { // this key resides in the same slot, no need to relocate it
pNode = pNode->next;
} else {
pNext = pNode->next;
// remove from current slot
if (pNode->prev != NULL) {
pNode->prev->next = pNode->next;
} else {
pObj->hashList[i] = pNode->next;
}
if (pNode->next != NULL) {
(pNode->next)->prev = pNode->prev;
}
// added into new slot
pNode->next = NULL;
pNode->prev = NULL;
pNode->next = pObj->hashList[j];
if (pObj->hashList[j] != NULL) {
(pObj->hashList[j])->prev = pNode;
}
pObj->hashList[j] = pNode;
// continue
pNode = pNext;
}
}
}
int64_t et = taosGetTimestampUs();
pObj->statistics.resizeTime += (et - st);
pTrace("cache resize completed, new capacity:%d, load factor:%f, elapsed time:%fms", pObj->capacity,
((double)pObj->size) / pObj->capacity, (et - st) / 1000.0);
}
/**
* release node
* @param pObj cache object
* @param pNode data node
*/
static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pObj, SDataNode *pNode) {
taosRemoveNodeInHashTable(pObj, pNode);
static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pObj, SCacheDataNode *pNode) {
if (pNode->signature != (uint64_t)pNode) {
pError("key:%s, %p data is invalid, or has been released", pNode->key, pNode);
return;
}
pTrace("key:%s is removed from cache,total:%d,size:%ldbytes", pNode->key, pObj->size, pObj->totalSize);
pNode->signature = 0;
taosHashRemove(pObj->pHashTable, pNode->key, pNode->keySize);
pTrace("key:%s is removed from cache,total:%d,size:%ldbytes", pNode->key, pObj->totalSize, pObj->totalSize);
free(pNode);
}
......@@ -471,8 +217,8 @@ static FORCE_INLINE void taosCacheReleaseNode(SCacheObj *pObj, SDataNode *pNode)
* @param pObj
* @param pNode
*/
static FORCE_INLINE void taosCacheMoveNodeToTrash(SCacheObj *pObj, SDataNode *pNode) {
taosRemoveNodeInHashTable(pObj, pNode);
static FORCE_INLINE void taosCacheMoveToTrash(SCacheObj *pObj, SCacheDataNode *pNode) {
taosHashRemove(pObj->pHashTable, pNode->key, pNode->keySize);
taosAddToTrash(pObj, pNode);
}
......@@ -486,15 +232,15 @@ static FORCE_INLINE void taosCacheMoveNodeToTrash(SCacheObj *pObj, SDataNode *pN
* @param dataSize
* @return
*/
static SDataNode *taosUpdateCacheImpl(SCacheObj *pObj, SDataNode *pNode, char *key, int32_t keyLen, void *pData,
uint32_t dataSize, uint64_t keepTime) {
SDataNode *pNewNode = NULL;
static SCacheDataNode *taosUpdateCacheImpl(SCacheObj *pObj, SCacheDataNode *pNode, char *key, int32_t keyLen,
void *pData, uint32_t dataSize, uint64_t duration) {
SCacheDataNode *pNewNode = NULL;
// only a node is not referenced by any other object, in-place update it
if (pNode->refCount == 0) {
size_t newSize = sizeof(SDataNode) + dataSize + keyLen;
if (T_REF_VAL_GET(pNode) == 0) {
size_t newSize = sizeof(SCacheDataNode) + dataSize + keyLen;
pNewNode = (SDataNode *)realloc(pNode, newSize);
pNewNode = (SCacheDataNode *)realloc(pNode, newSize);
if (pNewNode == NULL) {
return NULL;
}
......@@ -502,40 +248,37 @@ static SDataNode *taosUpdateCacheImpl(SCacheObj *pObj, SDataNode *pNode, char *k
pNewNode->signature = (uint64_t)pNewNode;
memcpy(pNewNode->data, pData, dataSize);
pNewNode->key = pNewNode->data + dataSize;
strcpy(pNewNode->key, key);
pNewNode->key = (char *)pNewNode + sizeof(SCacheDataNode) + dataSize;
pNewNode->keySize = keyLen;
memcpy(pNewNode->key, key, keyLen);
// update the timestamp information for updated key/value
pNewNode->addTime = taosGetTimestampMs();
pNewNode->time = pNewNode->addTime + keepTime;
pNewNode->addedTime = taosGetTimestampMs();
pNewNode->expiredTime = pNewNode->addedTime + duration;
atomic_add_fetch_32(&pNewNode->refCount, 1);
T_REF_INC(pNewNode);
// the address of this node may be changed, so the prev and next element should update the corresponding pointer
taosUpdateInHashTable(pObj, pNewNode);
taosHashPut(pObj->pHashTable, key, keyLen, &pNewNode, sizeof(void *));
} else {
int32_t hashVal = pNode->hashVal;
taosCacheMoveNodeToTrash(pObj, pNode);
taosCacheMoveToTrash(pObj, pNode);
pNewNode = taosCreateHashNode(key, keyLen, pData, dataSize, keepTime);
pNewNode = taosCreateHashNode(key, keyLen, pData, dataSize, duration);
if (pNewNode == NULL) {
return NULL;
}
atomic_add_fetch_32(&pNewNode->refCount, 1);
T_REF_INC(pNewNode);
assert(hashVal == (*pObj->hashFp)(key, keyLen - 1));
pNewNode->hashVal = hashVal;
// add new element to hashtable
taosAddNodeToHashTable(pObj, pNewNode);
// addedTime new element to hashtable
taosHashPut(pObj->pHashTable, key, keyLen, &pNewNode, sizeof(void *));
}
return pNewNode;
}
/**
* add data into hash table
* addedTime data into hash table
* @param key
* @param pData
* @param size
......@@ -544,206 +287,31 @@ static SDataNode *taosUpdateCacheImpl(SCacheObj *pObj, SDataNode *pNode, char *k
* @param pNode
* @return
*/
static FORCE_INLINE SDataNode *taosAddToCacheImpl(SCacheObj *pObj, char *key, uint32_t keyLen, const char *pData,
int dataSize, uint64_t lifespan) {
SDataNode *pNode = taosCreateHashNode(key, keyLen, pData, dataSize, lifespan);
static FORCE_INLINE SCacheDataNode *taosAddToCacheImpl(SCacheObj *pObj, char *key, size_t keyLen, const void *pData,
size_t dataSize, uint64_t duration) {
SCacheDataNode *pNode = taosCreateHashNode(key, keyLen, pData, dataSize, duration);
if (pNode == NULL) {
return NULL;
}
atomic_add_fetch_32(&pNode->refCount, 1);
pNode->hashVal = (*pObj->hashFp)(key, keyLen - 1);
taosAddNodeToHashTable(pObj, pNode);
T_REF_INC(pNode);
taosHashPut(pObj->pHashTable, key, keyLen, &pNode, sizeof(void *));
return pNode;
}
/**
* add data into cache
*
* @param handle cache object
* @param key key
* @param pData cached data
* @param dataSize data size
* @param keepTime survival time in second
* @return cached element
*/
void *taosAddDataIntoCache(void *handle, char *key, char *pData, int dataSize, int keepTime) {
SDataNode *pNode;
SCacheObj *pObj;
pObj = (SCacheObj *)handle;
if (pObj == NULL || pObj->capacity == 0) return NULL;
uint32_t keyLen = (uint32_t)strlen(key) + 1;
static void doCleanupDataCache(SCacheObj *pObj) {
__cache_wr_lock(pObj);
SDataNode *pOldNode = taosGetNodeFromHashTable(pObj, key, keyLen - 1);
if (pOldNode == NULL) { // do add to cache
// check if the threshold is reached
taosHashTableResize(pObj);
pNode = taosAddToCacheImpl(pObj, key, keyLen, pData, dataSize, keepTime * 1000L);
if (NULL != pNode) {
pTrace(
"key:%s %p added into cache, slot:%d, addTime:%" PRIu64 ", expireTime:%" PRIu64 ", cache total:%d, "
"size:%" PRId64 " bytes, collision:%d",
pNode->key, pNode, HASH_INDEX(pNode->hashVal, pObj->capacity), pNode->addTime, pNode->time, pObj->size,
pObj->totalSize, pObj->statistics.numOfCollision);
}
} else { // old data exists, update the node
pNode = taosUpdateCacheImpl(pObj, pOldNode, key, keyLen, pData, dataSize, keepTime * 1000L);
pTrace("key:%s %p exist in cache, updated", key, pNode);
if (taosHashGetSize(pObj->pHashTable) > 0) {
taosHashCleanup(pObj->pHashTable);
}
__cache_unlock(pObj);
return (pNode != NULL) ? pNode->data : NULL;
}
static FORCE_INLINE void taosDecRef(SDataNode *pNode) {
if (pNode == NULL) {
return;
}
if (pNode->refCount > 0) {
atomic_sub_fetch_32(&pNode->refCount, 1);
pTrace("key:%s is released by app.refcnt:%d", pNode->key, pNode->refCount);
} else {
/*
* safety check.
* app may false releases cached object twice, to decrease the refcount more than acquired
*/
pError("key:%s is released by app more than referenced.refcnt:%d", pNode->key, pNode->refCount);
}
}
/**
* remove data in cache, the data will not be removed immediately.
* if it is referenced by other object, it will be remain in cache
* @param handle
* @param data
*/
void taosRemoveDataFromCache(void *handle, void **data, bool _remove) {
SCacheObj *pObj = (SCacheObj *)handle;
if (pObj == NULL || pObj->capacity == 0 || (*data) == NULL || (pObj->size + pObj->numOfElemsInTrash == 0)) return;
size_t offset = offsetof(SDataNode, data);
SDataNode *pNode = (SDataNode *)((char *)(*data) - offset);
if (pNode->signature != (uint64_t)pNode) {
pError("key: %p release invalid cache data", pNode);
return;
}
*data = NULL;
if (_remove) {
__cache_wr_lock(pObj);
// pNode may be released immediately by other thread after the reference count of pNode is set to 0,
// So we need to lock it in the first place.
taosDecRef(pNode);
taosCacheMoveNodeToTrash(pObj, pNode);
__cache_unlock(pObj);
} else {
taosDecRef(pNode);
}
}
/**
* get data from cache
* @param handle cache object
* @param key key
* @return cached data or NULL
*/
void *taosGetDataFromCache(void *handle, char *key) {
SCacheObj *pObj = (SCacheObj *)handle;
if (pObj == NULL || pObj->capacity == 0) return NULL;
uint32_t keyLen = (uint32_t)strlen(key);
__cache_rd_lock(pObj);
SDataNode *ptNode = taosGetNodeFromHashTable(handle, key, keyLen);
if (ptNode != NULL) {
atomic_add_fetch_32(&ptNode->refCount, 1);
}
__cache_unlock(pObj);
if (ptNode != NULL) {
atomic_add_fetch_32(&pObj->statistics.hitCount, 1);
pTrace("key:%s is retrieved from cache,refcnt:%d", key, ptNode->refCount);
} else {
atomic_add_fetch_32(&pObj->statistics.missCount, 1);
pTrace("key:%s not in cache,retrieved failed", key);
}
atomic_add_fetch_32(&pObj->statistics.totalAccess, 1);
return (ptNode != NULL) ? ptNode->data : NULL;
}
/**
* update data in cache
* @param handle hash object handle(pointer)
* @param key key for hash
* @param pData actually data
* @param size length of data
* @return new referenced data
*/
void *taosUpdateDataFromCache(void *handle, char *key, char *pData, int size, int duration) {
SCacheObj *pObj = (SCacheObj *)handle;
if (pObj == NULL || pObj->capacity == 0) return NULL;
SDataNode *pNew = NULL;
uint32_t keyLen = strlen(key) + 1;
__cache_wr_lock(pObj);
SDataNode *pNode = taosGetNodeFromHashTable(handle, key, keyLen - 1);
if (pNode == NULL) { // object has been released, do add operation
pNew = taosAddToCacheImpl(pObj, key, keyLen, pData, size, duration * 1000L);
pWarn("key:%s does not exist, update failed,do add to cache.total:%d,size:%ldbytes", key, pObj->size,
pObj->totalSize);
} else {
pNew = taosUpdateCacheImpl(pObj, pNode, key, keyLen, pData, size, duration * 1000L);
pTrace("key:%s updated.expireTime:%" PRIu64 ".refCnt:%d", key, pNode->time, pNode->refCount);
}
__cache_unlock(pObj);
return (pNew != NULL) ? pNew->data : NULL;
}
static void doCleanUpDataCache(SCacheObj* pObj) {
SDataNode *pNode, *pNext;
__cache_wr_lock(pObj);
if (pObj->hashList && pObj->size > 0) {
for (int i = 0; i < pObj->capacity; ++i) {
pNode = pObj->hashList[i];
while (pNode) {
pNext = pNode->next;
free(pNode);
pNode = pNext;
}
}
tfree(pObj->hashList);
}
__cache_unlock(pObj);
taosClearCacheTrash(pObj, true);
taosTrashEmpty(pObj, true);
__cache_lock_destroy(pObj);
memset(pObj, 0, sizeof(SCacheObj));
free(pObj);
}
......@@ -751,101 +319,51 @@ static void doCleanUpDataCache(SCacheObj* pObj) {
* refresh cache to remove data in both hash list and trash, if any nodes' refcount == 0, every pObj->refreshTime
* @param handle Cache object handle
*/
void taosRefreshDataCache(void *handle, void *tmrId) {
SDataNode *pNode, *pNext;
static void taosCacheRefresh(void *handle, void *tmrId) {
SCacheObj *pObj = (SCacheObj *)handle;
if (pObj == NULL || pObj->capacity <= 0) {
if (pObj == NULL || taosHashGetSize(pObj->pHashTable) == 0) {
pTrace("object is destroyed. no refresh retry");
return;
}
if (pObj->deleting == 1) {
doCleanUpDataCache(pObj);
doCleanupDataCache(pObj);
return;
}
uint64_t time = taosGetTimestampMs();
uint32_t numOfCheck = 0;
uint64_t expiredTime = taosGetTimestampMs();
pObj->statistics.refreshCount++;
int32_t num = pObj->size;
SHashMutableIterator *pIter = taosHashCreateIter(pObj->pHashTable);
for (int i = 0; i < pObj->capacity; ++i) {
// in deleting process, quit refreshing immediately
__cache_wr_lock(pObj);
while (taosHashIterNext(pIter)) {
if (pObj->deleting == 1) {
taosHashDestroyIter(pIter);
break;
}
__cache_wr_lock(pObj);
pNode = pObj->hashList[i];
while (pNode) {
numOfCheck++;
pNext = pNode->next;
if (pNode->time <= time && pNode->refCount <= 0) {
SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
if (pNode->expiredTime <= expiredTime && T_REF_VAL_GET(pNode) <= 0) {
taosCacheReleaseNode(pObj, pNode);
}
pNode = pNext;
}
/* all data have been checked, not need to iterate further */
if (numOfCheck == num || pObj->size <= 0) {
__cache_unlock(pObj);
break;
}
__cache_unlock(pObj);
}
taosHashDestroyIter(pIter);
if (pObj->deleting == 1) { // clean up resources and abort
doCleanUpDataCache(pObj);
doCleanupDataCache(pObj);
} else {
taosClearCacheTrash(pObj, false);
taosTmrReset(taosRefreshDataCache, pObj->refreshTime, pObj, pObj->tmrCtrl, &pObj->pTimer);
taosTrashEmpty(pObj, false);
taosTmrReset(taosCacheRefresh, pObj->refreshTime, pObj, pObj->tmrCtrl, &pObj->pTimer);
}
}
/**
*
* @param handle
* @param tmrId
*/
void taosClearDataCache(void *handle) {
SDataNode *pNode, *pNext;
SCacheObj *pObj = (SCacheObj *)handle;
int32_t capacity = pObj->capacity;
for (int i = 0; i < capacity; ++i) {
__cache_wr_lock(pObj);
pNode = pObj->hashList[i];
while (pNode) {
pNext = pNode->next;
taosCacheMoveNodeToTrash(pObj, pNode);
pNode = pNext;
}
pObj->hashList[i] = NULL;
__cache_unlock(pObj);
}
taosClearCacheTrash(pObj, false);
}
/**
* @param capacity maximum slots available for hash elements
* @param tmrCtrl timer ctrl
* @param refreshTime refresh operation interval time, the maximum survival time when one element is expired and
* not referenced by other objects
* @return
*/
void *taosInitDataCache(int capacity, void *tmrCtrl, int64_t refreshTime) {
if (tmrCtrl == NULL || refreshTime <= 0 || capacity <= 0) {
SCacheObj *taosCacheInit(void *tmrCtrl, int64_t refreshTime) {
if (tmrCtrl == NULL || refreshTime <= 0) {
return NULL;
}
......@@ -855,93 +373,190 @@ void *taosInitDataCache(int capacity, void *tmrCtrl, int64_t refreshTime) {
return NULL;
}
// the max slots is not defined by user
pObj->capacity = taosHashTableLength(capacity);
assert((pObj->capacity & (pObj->capacity - 1)) == 0);
pObj->hashFp = taosHashKey;
pObj->refreshTime = refreshTime * 1000;
pObj->hashList = (SDataNode **)calloc(1, sizeof(SDataNode *) * pObj->capacity);
if (pObj->hashList == NULL) {
pObj->pHashTable = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false);
if (pObj->pHashTable == NULL) {
free(pObj);
pError("failed to allocate memory, reason:%s", strerror(errno));
return NULL;
}
// set free cache node callback function for hash table
taosHashSetFreecb(pObj->pHashTable, taosFreeNode);
pObj->refreshTime = refreshTime * 1000;
pObj->tmrCtrl = tmrCtrl;
taosTmrReset(taosRefreshDataCache, pObj->refreshTime, pObj, pObj->tmrCtrl, &pObj->pTimer);
taosTmrReset(taosCacheRefresh, pObj->refreshTime, pObj, pObj->tmrCtrl, &pObj->pTimer);
if (__cache_lock_init(pObj) != 0) {
taosTmrStopA(&pObj->pTimer);
free(pObj->hashList);
taosHashCleanup(pObj->pHashTable);
free(pObj);
pError("failed to init lock, reason:%s", strerror(errno));
return NULL;
}
return (void *)pObj;
return pObj;
}
/**
* release all allocated memory and destroy the cache object.
*
* This function only set the deleting flag, and the specific work of clean up cache is delegated to
* taosRefreshDataCache function, which will executed every SCacheObj->refreshTime sec.
*
* If the value of SCacheObj->refreshTime is too large, the taosRefreshDataCache function may not be invoked
* before the main thread terminated, in which case all allocated resources are simply recycled by OS.
*
* @param handle
*/
void taosCleanUpDataCache(void *handle) {
void *taosCachePut(void *handle, char *key, char *pData, int dataSize, int duration) {
SCacheDataNode *pNode;
SCacheObj * pObj;
pObj = (SCacheObj *)handle;
if (pObj == NULL || pObj->pHashTable == NULL) {
return NULL;
}
size_t keyLen = strlen(key);
__cache_wr_lock(pObj);
SCacheDataNode **pt = (SCacheDataNode **)taosHashGet(pObj->pHashTable, key, keyLen);
SCacheDataNode * pOld = (pt != NULL) ? (*pt) : NULL;
if (pOld == NULL) { // do addedTime to cache
pNode = taosAddToCacheImpl(pObj, key, keyLen, pData, dataSize, duration * 1000L);
if (NULL != pNode) {
pTrace("key:%s %p added into cache, addedTime:%" PRIu64 ", expireTime:%" PRIu64 ", cache total:%d, size:%" PRId64
" bytes, collision:%d",
key, pNode, pNode->addedTime, pNode->expiredTime, dataSize, pObj->totalSize,
pObj->statistics.numOfCollision);
}
} else { // old data exists, update the node
pNode = taosUpdateCacheImpl(pObj, pOld, key, keyLen, pData, dataSize, duration * 1000L);
pTrace("key:%s %p exist in cache, updated", key, pNode);
}
__cache_unlock(pObj);
return (pNode != NULL) ? pNode->data : NULL;
}
void *taosCacheAcquireByName(void *handle, char *key) {
SCacheObj *pObj = (SCacheObj *)handle;
if (pObj == NULL) {
return;
if (pObj == NULL || taosHashGetSize(pObj->pHashTable) == 0) {
return NULL;
}
uint32_t keyLen = (uint32_t)strlen(key);
__cache_rd_lock(pObj);
SCacheDataNode **ptNode = (SCacheDataNode **)taosHashGet(pObj->pHashTable, key, keyLen);
if (ptNode != NULL) {
T_REF_INC(*ptNode);
}
pObj->deleting = 1;
__cache_unlock(pObj);
if (ptNode != NULL) {
atomic_add_fetch_32(&pObj->statistics.hitCount, 1);
pTrace("key:%s is retrieved from cache,refcnt:%d", key, T_REF_VAL_GET(*ptNode));
} else {
atomic_add_fetch_32(&pObj->statistics.missCount, 1);
pTrace("key:%s not in cache,retrieved failed", key);
}
atomic_add_fetch_32(&pObj->statistics.totalAccess, 1);
return (ptNode != NULL) ? (*ptNode)->data : NULL;
}
void* taosGetDataFromExists(void* handle, void* data) {
void *taosCacheAcquireByData(void *handle, void *data) {
SCacheObj *pObj = (SCacheObj *)handle;
if (pObj == NULL || data == NULL) return NULL;
size_t offset = offsetof(SDataNode, data);
SDataNode *ptNode = (SDataNode *)((char *)data - offset);
size_t offset = offsetof(SCacheDataNode, data);
SCacheDataNode *ptNode = (SCacheDataNode *)((char *)data - offset);
if (ptNode->signature != (uint64_t) ptNode) {
if (ptNode->signature != (uint64_t)ptNode) {
pError("key: %p the data from cache is invalid", ptNode);
return NULL;
}
int32_t ref = atomic_add_fetch_32(&ptNode->refCount, 1);
pTrace("%p add ref data in cache, refCnt:%d", data, ref)
int32_t ref = T_REF_INC(ptNode);
pTrace("%p addedTime ref data in cache, refCnt:%d", data, ref)
// the data if referenced by at least one object, so the reference count must be greater than the value of 2.
assert(ref >= 2);
return data;
}
void* taosTransferDataInCache(void* handle, void** data) {
void *taosCacheTransfer(void *handle, void **data) {
SCacheObj *pObj = (SCacheObj *)handle;
if (pObj == NULL || data == NULL) return NULL;
size_t offset = offsetof(SDataNode, data);
SDataNode *ptNode = (SDataNode *)((char *)(*data) - offset);
size_t offset = offsetof(SCacheDataNode, data);
SCacheDataNode *ptNode = (SCacheDataNode *)((char *)(*data) - offset);
if (ptNode->signature != (uint64_t) ptNode) {
if (ptNode->signature != (uint64_t)ptNode) {
pError("key: %p the data from cache is invalid", ptNode);
return NULL;
}
assert(ptNode->refCount >= 1);
assert(T_REF_VAL_GET(ptNode) >= 1);
char* d = *data;
char *d = *data;
// clear its reference to old area
*data = NULL;
return d;
}
void taosCacheRelease(void *handle, void **data, bool _remove) {
SCacheObj *pObj = (SCacheObj *)handle;
if (pObj == NULL || (*data) == NULL || (taosHashGetSize(pObj->pHashTable) + pObj->numOfElemsInTrash == 0)) {
return;
}
size_t offset = offsetof(SCacheDataNode, data);
SCacheDataNode *pNode = (SCacheDataNode *)((char *)(*data) - offset);
if (pNode->signature != (uint64_t)pNode) {
pError("key: %p release invalid cache data", pNode);
return;
}
*data = NULL;
if (_remove) {
__cache_wr_lock(pObj);
// pNode may be released immediately by other thread after the reference count of pNode is set to 0,
// So we need to lock it in the first place.
T_REF_DEC(pNode);
taosCacheMoveToTrash(pObj, pNode);
__cache_unlock(pObj);
} else {
T_REF_DEC(pNode);
}
}
void taosCacheEmpty(SCacheObj *pCacheObj) {
SHashMutableIterator *pIter = taosHashCreateIter(pCacheObj->pHashTable);
__cache_wr_lock(pCacheObj);
while (taosHashIterNext(pIter)) {
if (pCacheObj->deleting == 1) {
taosHashDestroyIter(pIter);
break;
}
SCacheDataNode *pNode = *(SCacheDataNode **)taosHashIterGet(pIter);
taosCacheMoveToTrash(pCacheObj, pNode);
}
__cache_unlock(pCacheObj);
taosHashDestroyIter(pIter);
taosTrashEmpty(pCacheObj, false);
}
void taosCacheCleanup(SCacheObj *pCacheObj) {
if (pCacheObj == NULL) {
return;
}
pCacheObj->deleting = 1;
}
......@@ -484,7 +484,7 @@ int tscProcessLocalCmd(SSqlObj *pSql) {
pSql->res.qhandle = 0x1;
pSql->res.numOfRows = 0;
} else if (pCmd->command == TSDB_SQL_RESET_CACHE) {
taosClearDataCache(tscCacheHandle);
taosCacheEmpty(tscCacheHandle);
} else if (pCmd->command == TSDB_SQL_SERV_VERSION) {
tscProcessServerVer(pSql);
} else if (pCmd->command == TSDB_SQL_CLI_VERSION) {
......
......@@ -182,7 +182,6 @@ int tscSendMsgToServer(SSqlObj *pSql) {
}
pSql->ipList->ip[0] = inet_addr("192.168.0.1");
SSqlCmd* pCmd = &pSql->cmd;
if (pSql->cmd.command < TSDB_SQL_MGMT) {
pSql->ipList->port = tsDnodeShellPort;
......@@ -2641,7 +2640,7 @@ int tscProcessMeterMetaRsp(SSqlObj *pSql) {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
assert(pMeterMetaInfo->pMeterMeta == NULL);
pMeterMetaInfo->pMeterMeta = (STableMeta *)taosAddDataIntoCache(tscCacheHandle, pMeterMetaInfo->name, (char *)pMeta,
pMeterMetaInfo->pMeterMeta = (STableMeta *)taosCachePut(tscCacheHandle, pMeterMetaInfo->name, (char *)pMeta,
pMeta->contLen, tsMeterMetaKeepTimer);
// todo handle out of memory case
if (pMeterMetaInfo->pMeterMeta == NULL) return 0;
......@@ -2750,7 +2749,7 @@ int tscProcessMultiMeterMetaRsp(SSqlObj *pSql) {
int32_t size = (int32_t)(rsp - ((char *)pMeta)); // Consistent with STableMeta in cache
pMeta->index = 0;
(void)taosAddDataIntoCache(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsMeterMetaKeepTimer);
(void)taosCachePut(tscCacheHandle, pMeta->tableId, (char *)pMeta, size, tsMeterMetaKeepTimer);
}
pSql->res.code = TSDB_CODE_SUCCESS;
......@@ -2857,9 +2856,9 @@ int tscProcessMetricMetaRsp(SSqlObj *pSql) {
#endif
// release the used metricmeta
taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);
taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);
pMeterMetaInfo->pMetricMeta = (SSuperTableMeta *)taosAddDataIntoCache(tscCacheHandle, name, (char *)metricMetaList[i],
pMeterMetaInfo->pMetricMeta = (SSuperTableMeta *)taosCachePut(tscCacheHandle, name, (char *)metricMetaList[i],
sizes[i], tsMetricMetaKeepTimer);
tfree(metricMetaList[i]);
......@@ -2917,11 +2916,11 @@ int tscProcessShowRsp(SSqlObj *pSql) {
key[0] = pCmd->msgType + 'a';
strcpy(key + 1, "showlist");
taosRemoveDataFromCache(tscCacheHandle, (void *)&(pMeterMetaInfo->pMeterMeta), false);
taosCacheRelease(tscCacheHandle, (void *)&(pMeterMetaInfo->pMeterMeta), false);
int32_t size = pMeta->numOfColumns * sizeof(SSchema) + sizeof(STableMeta);
pMeterMetaInfo->pMeterMeta =
(STableMeta *)taosAddDataIntoCache(tscCacheHandle, key, (char *)pMeta, size, tsMeterMetaKeepTimer);
(STableMeta *)taosCachePut(tscCacheHandle, key, (char *)pMeta, size, tsMeterMetaKeepTimer);
pCmd->numOfCols = pQueryInfo->fieldsInfo.numOfOutputCols;
SSchema *pMeterSchema = tsGetSchema(pMeterMetaInfo->pMeterMeta);
......@@ -2975,14 +2974,14 @@ int tscProcessUseDbRsp(SSqlObj *pSql) {
}
int tscProcessDropDbRsp(SSqlObj *UNUSED_PARAM(pSql)) {
taosClearDataCache(tscCacheHandle);
taosCacheEmpty(tscCacheHandle);
return 0;
}
int tscProcessDropTableRsp(SSqlObj *pSql) {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
STableMeta *pMeterMeta = taosGetDataFromCache(tscCacheHandle, pMeterMetaInfo->name);
STableMeta *pMeterMeta = taosCacheAcquireByName(tscCacheHandle, pMeterMetaInfo->name);
if (pMeterMeta == NULL) {
/* not in cache, abort */
return 0;
......@@ -2996,11 +2995,11 @@ int tscProcessDropTableRsp(SSqlObj *pSql) {
* instead.
*/
tscTrace("%p force release metermeta after drop table:%s", pSql, pMeterMetaInfo->name);
taosRemoveDataFromCache(tscCacheHandle, (void **)&pMeterMeta, true);
taosCacheRelease(tscCacheHandle, (void **)&pMeterMeta, true);
if (pMeterMetaInfo->pMeterMeta) {
taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), true);
taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), true);
}
return 0;
......@@ -3009,23 +3008,23 @@ int tscProcessDropTableRsp(SSqlObj *pSql) {
int tscProcessAlterTableMsgRsp(SSqlObj *pSql) {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfo(&pSql->cmd, 0, 0);
STableMeta *pMeterMeta = taosGetDataFromCache(tscCacheHandle, pMeterMetaInfo->name);
STableMeta *pMeterMeta = taosCacheAcquireByName(tscCacheHandle, pMeterMetaInfo->name);
if (pMeterMeta == NULL) { /* not in cache, abort */
return 0;
}
tscTrace("%p force release metermeta in cache after alter-table: %s", pSql, pMeterMetaInfo->name);
taosRemoveDataFromCache(tscCacheHandle, (void **)&pMeterMeta, true);
taosCacheRelease(tscCacheHandle, (void **)&pMeterMeta, true);
if (pMeterMetaInfo->pMeterMeta) {
bool isSuperTable = UTIL_METER_IS_SUPERTABLE(pMeterMetaInfo);
taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), true);
taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), true);
if (isSuperTable) { // if it is a super table, reset whole query cache
tscTrace("%p reset query cache since table:%s is stable", pSql, pMeterMetaInfo->name);
taosClearDataCache(tscCacheHandle);
taosCacheEmpty(tscCacheHandle);
}
}
......@@ -3151,7 +3150,7 @@ static int32_t doGetMeterMetaFromServer(SSqlObj *pSql, SMeterMetaInfo *pMeterMet
* Transfer the ownership of metermeta to the new object, instead of invoking the release/acquire routine
*/
if (code == TSDB_CODE_SUCCESS) {
pMeterMetaInfo->pMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**) &pNewMeterMetaInfo->pMeterMeta);
pMeterMetaInfo->pMeterMeta = taosCacheTransfer(tscCacheHandle, (void**) &pNewMeterMetaInfo->pMeterMeta);
assert(pMeterMetaInfo->pMeterMeta != NULL);
}
......@@ -3177,10 +3176,10 @@ int tscGetMeterMeta(SSqlObj *pSql, SMeterMetaInfo *pMeterMetaInfo) {
// If this SMeterMetaInfo owns a metermeta, release it first
if (pMeterMetaInfo->pMeterMeta != NULL) {
taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), false);
taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), false);
}
pMeterMetaInfo->pMeterMeta = (STableMeta *)taosGetDataFromCache(tscCacheHandle, pMeterMetaInfo->name);
pMeterMetaInfo->pMeterMeta = (STableMeta *)taosCacheAcquireByName(tscCacheHandle, pMeterMetaInfo->name);
if (pMeterMetaInfo->pMeterMeta != NULL) {
STableMeta *pMeterMeta = pMeterMetaInfo->pMeterMeta;
......@@ -3244,7 +3243,7 @@ int tscRenewMeterMeta(SSqlObj *pSql, char *tableId) {
}
tscWaitingForCreateTable(pCmd);
taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMeterMeta), true);
code = doGetMeterMetaFromServer(pSql, pMeterMetaInfo); // todo ??
} else {
......@@ -3278,9 +3277,9 @@ int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) {
SMeterMetaInfo *pMeterMetaInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
tscGetMetricMetaCacheKey(pQueryInfo, tagstr, pMeterMetaInfo->pMeterMeta->uid);
taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);
taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);
SSuperTableMeta *ppMeta = (SSuperTableMeta *)taosGetDataFromCache(tscCacheHandle, tagstr);
SSuperTableMeta *ppMeta = (SSuperTableMeta *)taosCacheAcquireByName(tscCacheHandle, tagstr);
if (ppMeta == NULL) {
required = true;
break;
......@@ -3308,7 +3307,7 @@ int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) {
for (int32_t i = 0; i < pQueryInfo->numOfTables; ++i) {
SMeterMetaInfo *pMMInfo = tscGetMeterMetaInfoFromQueryInfo(pQueryInfo, i);
STableMeta *pMeterMeta = taosGetDataFromCache(tscCacheHandle, pMMInfo->name);
STableMeta *pMeterMeta = taosCacheAcquireByName(tscCacheHandle, pMMInfo->name);
tscAddMeterMetaInfo(pNewQueryInfo, pMMInfo->name, pMeterMeta, NULL, pMMInfo->numOfTags, pMMInfo->tagColumnIndex);
}
......@@ -3353,8 +3352,8 @@ int tscGetMetricMeta(SSqlObj *pSql, int32_t clauseIndex) {
printf("create metric key:%s, index:%d\n", tagstr, i);
#endif
taosRemoveDataFromCache(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);
pMeterMetaInfo->pMetricMeta = (SSuperTableMeta *)taosGetDataFromCache(tscCacheHandle, tagstr);
taosCacheRelease(tscCacheHandle, (void **)&(pMeterMetaInfo->pMetricMeta), false);
pMeterMetaInfo->pMetricMeta = (SSuperTableMeta *) taosCacheAcquireByName(tscCacheHandle, tagstr);
}
}
......
......@@ -371,7 +371,7 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
pSql->sqlstr = NULL;
taos_free_result_imp(pSql, 0);
pSql->sqlstr = sqlstr;
taosClearDataCache(tscCacheHandle);
taosCacheEmpty(tscCacheHandle);
if (!tscUpdateSubscription(pSub->taos, pSub)) return NULL;
tscTrace("meter synchronization completed");
} else {
......
......@@ -186,7 +186,7 @@ void taos_init_imp() {
refreshTime = refreshTime > 2 ? 2 : refreshTime;
refreshTime = refreshTime < 1 ? 1 : refreshTime;
if (tscCacheHandle == NULL) tscCacheHandle = taosInitDataCache(tsMaxMeterConnections / 2, tscTmr, refreshTime);
if (tscCacheHandle == NULL) tscCacheHandle = taosCacheInit(tscTmr, refreshTime);
tscConnCache = taosOpenConnCache(tsMaxMeterConnections * 2, NULL/*taosCloseRpcConn*/, tscTmr, tsShellActivityTimer * 1000);
......
......@@ -505,7 +505,7 @@ void tscDestroyDataBlock(STableDataBlocks* pDataBlock) {
tfree(pDataBlock->params);
// free the refcount for metermeta
taosRemoveDataFromCache(tscCacheHandle, (void**)&(pDataBlock->pMeterMeta), false);
taosCacheRelease(tscCacheHandle, (void**)&(pDataBlock->pMeterMeta), false);
tfree(pDataBlock);
}
......@@ -589,9 +589,9 @@ int32_t tscCopyDataBlockToPayload(SSqlObj* pSql, STableDataBlocks* pDataBlock) {
// set the correct metermeta object, the metermeta has been locked in pDataBlocks, so it must be in the cache
if (pMeterMetaInfo->pMeterMeta != pDataBlock->pMeterMeta) {
strcpy(pMeterMetaInfo->name, pDataBlock->tableId);
taosRemoveDataFromCache(tscCacheHandle, (void**)&(pMeterMetaInfo->pMeterMeta), false);
taosCacheRelease(tscCacheHandle, (void**)&(pMeterMetaInfo->pMeterMeta), false);
pMeterMetaInfo->pMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**)&pDataBlock->pMeterMeta);
pMeterMetaInfo->pMeterMeta = taosCacheTransfer(tscCacheHandle, (void**)&pDataBlock->pMeterMeta);
} else {
assert(strncmp(pMeterMetaInfo->name, pDataBlock->tableId, tListLen(pDataBlock->tableId)) == 0);
}
......@@ -665,7 +665,7 @@ int32_t tscCreateDataBlock(size_t initialSize, int32_t rowSize, int32_t startOff
* due to operation such as drop database. So here we add the reference count directly instead of invoke
* taosGetDataFromCache, which may return NULL value.
*/
dataBuf->pMeterMeta = taosGetDataFromExists(tscCacheHandle, pMeterMeta);
dataBuf->pMeterMeta = taosCacheAcquireByData(tscCacheHandle, pMeterMeta);
assert(initialSize > 0 && pMeterMeta != NULL && dataBuf->pMeterMeta != NULL);
*dataBlocks = dataBuf;
......@@ -1940,8 +1940,8 @@ void tscClearMeterMetaInfo(SMeterMetaInfo* pMeterMetaInfo, bool removeFromCache)
return;
}
taosRemoveDataFromCache(tscCacheHandle, (void**)&(pMeterMetaInfo->pMeterMeta), removeFromCache);
taosRemoveDataFromCache(tscCacheHandle, (void**)&(pMeterMetaInfo->pMetricMeta), removeFromCache);
taosCacheRelease(tscCacheHandle, (void**)&(pMeterMetaInfo->pMeterMeta), removeFromCache);
taosCacheRelease(tscCacheHandle, (void**)&(pMeterMetaInfo->pMetricMeta), removeFromCache);
}
void tscResetForNextRetrieve(SSqlRes* pRes) {
......@@ -2071,16 +2071,16 @@ SSqlObj* createSubqueryObj(SSqlObj* pSql, int16_t tableIndex, void (*fp)(), void
SMeterMetaInfo* pFinalInfo = NULL;
if (pPrevSql == NULL) {
STableMeta* pMeterMeta = taosGetDataFromCache(tscCacheHandle, name);
SSuperTableMeta* pMetricMeta = taosGetDataFromCache(tscCacheHandle, key);
STableMeta* pMeterMeta = taosCacheAcquireByName(tscCacheHandle, name);
SSuperTableMeta* pMetricMeta = taosCacheAcquireByName(tscCacheHandle, key);
pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pMeterMeta, pMetricMeta, pMeterMetaInfo->numOfTags,
pMeterMetaInfo->tagColumnIndex);
} else { // transfer the ownership of pMeterMeta/pMetricMeta to the newly create sql object.
SMeterMetaInfo* pPrevInfo = tscGetMeterMetaInfo(&pPrevSql->cmd, pPrevSql->cmd.clauseIndex, 0);
STableMeta* pPrevMeterMeta = taosTransferDataInCache(tscCacheHandle, (void**)&pPrevInfo->pMeterMeta);
SSuperTableMeta* pPrevMetricMeta = taosTransferDataInCache(tscCacheHandle, (void**)&pPrevInfo->pMetricMeta);
STableMeta* pPrevMeterMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pMeterMeta);
SSuperTableMeta* pPrevMetricMeta = taosCacheTransfer(tscCacheHandle, (void**)&pPrevInfo->pMetricMeta);
pFinalInfo = tscAddMeterMetaInfo(pNewQueryInfo, name, pPrevMeterMeta, pPrevMetricMeta, pMeterMetaInfo->numOfTags,
pMeterMetaInfo->tagColumnIndex);
......
......@@ -20,7 +20,63 @@
extern "C" {
#endif
#include <stdbool.h>
#include "os.h"
#include "tref.h"
#include "hash.h"
typedef struct SCacheStatis {
int64_t missCount;
int64_t hitCount;
int64_t totalAccess;
int64_t refreshCount;
int32_t numOfCollision;
} SCacheStatis;
typedef struct SCacheDataNode {
uint64_t addedTime; // the added time when this element is added or updated into cache
uint64_t expiredTime; // expiredTime expiredTime when this element should be remove from cache
uint64_t signature;
uint32_t size; // allocated size for current SCacheDataNode
uint16_t keySize : 15;
bool inTrash : 1; // denote if it is in trash or not
T_REF_DECLARE()
char *key;
char data[];
} SCacheDataNode;
typedef struct STrashElem {
struct STrashElem *prev;
struct STrashElem *next;
SCacheDataNode * pData;
} STrashElem;
typedef struct {
int64_t totalSize; // total allocated buffer in this hash table, SCacheObj is not included.
int64_t refreshTime;
/*
* to accommodate the old datanode which has the same key value of new one in hashList
* when an new node is put into cache, if an existed one with the same key:
* 1. if the old one does not be referenced, update it.
* 2. otherwise, move the old one to pTrash, addedTime the new one.
*
* when the node in pTrash does not be referenced, it will be release at the expired expiredTime
*/
STrashElem * pTrash;
void * tmrCtrl;
void * pTimer;
SCacheStatis statistics;
SHashObj * pHashTable;
int numOfElemsInTrash; // number of element in trash
int16_t deleting; // set the deleting flag to stop refreshing ASAP.
#if defined(LINUX)
pthread_rwlock_t lock;
#else
pthread_mutex_t lock;
#endif
} SCacheObj;
/**
*
......@@ -30,7 +86,7 @@ extern "C" {
* not referenced by other objects
* @return
*/
void *taosInitDataCache(int maxSessions, void *tmrCtrl, int64_t refreshTimeInSeconds);
SCacheObj *taosCacheInit(void *tmrCtrl, int64_t refreshTimeInSeconds);
/**
* add data into cache
......@@ -42,28 +98,7 @@ void *taosInitDataCache(int maxSessions, void *tmrCtrl, int64_t refreshTimeInSec
* @param keepTime survival time in second
* @return cached element
*/
void *taosAddDataIntoCache(void *handle, char *key, char *pData, int dataSize, int keepTimeInSeconds);
/**
* remove data in cache, the data will not be removed immediately.
* if it is referenced by other object, it will be remain in cache
* @param handle cache object
* @param data not the key, actually referenced data
* @param _remove force model, reduce the ref count and move the data into
* pTrash
*/
void taosRemoveDataFromCache(void *handle, void **data, bool _remove);
/**
* update data in cache
* @param handle hash object handle(pointer)
* @param key key for hash
* @param pData actually data
* @param size length of data
* @param duration survival time of this object in cache
* @return new referenced data
*/
void *taosUpdateDataFromCache(void *handle, char *key, char *pData, int size, int duration);
void *taosCachePut(void *handle, char *key, char *pData, int dataSize, int keepTimeInSeconds);
/**
* get data from cache
......@@ -71,40 +106,56 @@ void *taosUpdateDataFromCache(void *handle, char *key, char *pData, int size, in
* @param key key
* @return cached data or NULL
*/
void *taosGetDataFromCache(void *handle, char *key);
void *taosCacheAcquireByName(void *handle, char *key);
/**
* release all allocated memory and destroy the cache object
* Add one reference count for the exist data, and assign this data for a new owner.
* The new owner needs to invoke the taosCacheRelease when it does not need this data anymore.
* This procedure is a faster version of taosCacheAcquireByName function, which avoids the sideeffect of the problem of
* the data is moved to trash, and taosCacheAcquireByName will fail to retrieve it again.
*
* @param handle
* @param data
* @return
*/
void taosCleanUpDataCache(void *handle);
void *taosCacheAcquireByData(void *handle, void *data);
/**
* move all data node into trash,clear node in trash can if it is not referenced by client
* transfer the ownership of data in cache to another object without increasing reference count.
* @param handle
* @param data
* @return
*/
void taosClearDataCache(void *handle);
void *taosCacheTransfer(void *handle, void **data);
/**
* Add one reference count for the exist data, and assign this data for a new owner.
* The new owner needs to invoke the taosRemoveDataFromCache when it does not need this data anymore.
* This procedure is a faster version of taosGetDataFromCache function, which avoids the sideeffect of the problem of the
* data is moved to trash, and taosGetDataFromCache will fail to retrieve it again.
*
* remove data in cache, the data will not be removed immediately.
* if it is referenced by other object, it will be remain in cache
* @param handle cache object
* @param data not the key, actually referenced data
* @param _remove force model, reduce the ref count and move the data into
* pTrash
*/
void taosCacheRelease(void *handle, void **data, bool _remove);
/**
* move all data node into trash, clear node in trash can if it is not referenced by any clients
* @param handle
* @param data
* @return
*/
void* taosGetDataFromExists(void* handle, void* data);
void taosCacheEmpty(SCacheObj *pCacheObj);
/**
* transfer the ownership of data in cache to another object without increasing reference count.
* release all allocated memory and destroy the cache object.
*
* This function only set the deleting flag, and the specific work of clean up cache is delegated to
* taosCacheRefresh function, which will executed every SCacheObj->refreshTime sec.
*
* If the value of SCacheObj->refreshTime is too large, the taosCacheRefresh function may not be invoked
* before the main thread terminated, in which case all allocated resources are simply recycled by OS.
*
* @param handle
* @param data
* @return
*/
void* taosTransferDataInCache(void* handle, void** data);
void taosCacheCleanup(SCacheObj *pCacheObj);
#ifdef __cplusplus
}
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册